suwoodblender/test/blender_suw_core_client.py

922 lines
33 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Blender SUWood Core Client - 完整集成版本
用途: 在Blender中启动SUW Core模块并连接SUWood服务器
版本: 1.0.0
作者: SUWood Team
功能特性:
1. 自动检测并初始化suw_core模块化架构
2. 启动suw_client与SUWood服务器通信
3. 使用命令分发器优化处理服务器命令
4. 在Blender中实时绘制3D模型
5. 支持所有SUWood操作(创建部件、加工、删除等)
"""
import sys
import os
import time
import threading
import datetime
import traceback
from typing import Dict, Any, Optional, List
import logging
# ==================== 移除会话管理器导入 ====================
# from session_manager import session_manager # 【移除】暂时移除会话管理
# ==================== 路径配置 ====================
SUWOOD_PATH = r"D:\XL\code\blender\blenderpython"
AUTO_INIT_PATH = r"D:\XL\code\blender\test\auto_init_suwood.py"
print("🚀 Blender SUWood Core Client 启动...")
print(f"📁 SUWood路径: {SUWOOD_PATH}")
# 添加路径到Python搜索路径
if SUWOOD_PATH not in sys.path:
sys.path.insert(0, SUWOOD_PATH)
# ==================== 模块导入和初始化 ====================
class SUWoodInitializer:
"""SUWood初始化器"""
def __init__(self):
self.suw_impl = None
self.suw_core = None
self.command_dispatcher = None
self.use_modular = False
self.managers = {}
def initialize(self):
"""初始化SUWood系统"""
try:
# 1. 导入基础模块
self._import_basic_modules()
# 2. 初始化SUWImpl
self._init_suw_impl()
# 3. 检查并初始化suw_core
self._init_suw_core()
# 4. 设置命令分发器
self._setup_command_dispatcher()
return True
except Exception as e:
print(f"❌ SUWood初始化失败: {e}")
traceback.print_exc()
return False
def _import_basic_modules(self):
"""导入基础模块"""
print("📦 导入基础模块...")
try:
from suw_impl import SUWImpl
from suw_client import SUWClient, get_client, get_cmds, set_cmd
self.SUWImpl = SUWImpl
self.get_client = get_client
self.get_cmds = get_cmds
self.set_cmd = set_cmd
print("✅ 基础模块导入成功")
except Exception as e:
print(f"❌ 基础模块导入失败: {e}")
raise
def _init_suw_impl(self):
"""初始化SUWImpl"""
print("🔧 初始化SUWImpl...")
self.suw_impl = self.SUWImpl.get_instance()
self.suw_impl.startup()
print("✅ SUWImpl初始化完成")
def _init_suw_core(self):
"""检查并初始化suw_core"""
print("🔍 检查suw_core模块...")
try:
import suw_core
self.suw_core = suw_core
# 检查是否已经有完整的模块化架构
if (hasattr(suw_core, 'command_dispatcher') and
suw_core.command_dispatcher and
hasattr(suw_core.command_dispatcher, 'dispatch_command')):
print("✅ 检测到完整的模块化架构")
self.use_modular = True
self.command_dispatcher = suw_core.command_dispatcher
else:
print("⚙️ 模块化架构不完整,尝试自动修复...")
self._apply_auto_fix()
except ImportError as e:
print(f"❌ suw_core导入失败: {e}")
print("🔄 将使用原始SUWImpl处理命令")
self.use_modular = False
def _apply_auto_fix(self):
"""应用自动修复脚本"""
try:
if os.path.exists(AUTO_INIT_PATH):
print(f"📄 执行自动修复脚本: {AUTO_INIT_PATH}")
# 创建一个安全的执行环境
safe_globals = {
'__name__': '__main__',
'__file__': AUTO_INIT_PATH,
'sys': sys,
}
with open(AUTO_INIT_PATH, 'r', encoding='utf-8') as f:
exec(f.read(), safe_globals)
# 重新检查模块化架构
if (hasattr(self.suw_core, 'command_dispatcher') and
self.suw_core.command_dispatcher and
hasattr(self.suw_core.command_dispatcher, 'dispatch_command')):
print("✅ 自动修复成功,模块化架构已激活")
self.use_modular = True
self.command_dispatcher = self.suw_core.command_dispatcher
else:
print("⚠️ 自动修复后仍未检测到完整架构,使用原始方法")
self.use_modular = False
else:
print(f"⚠️ 自动修复脚本不存在: {AUTO_INIT_PATH}")
self.use_modular = False
except Exception as e:
print(f"❌ 自动修复失败: {e}")
self.use_modular = False
def _setup_command_dispatcher(self):
"""设置命令分发器"""
if self.use_modular and self.command_dispatcher:
print("⚙️ 配置模块化命令分发器...")
# 确保所有管理器都正确初始化
try:
self.managers = self.suw_core.init_all_managers(self.suw_impl)
# 验证关键管理器
required_managers = [
'part_creator', 'material_manager', 'machining_manager',
'selection_manager', 'deletion_manager', 'hardware_manager',
'door_drawer_manager', 'dimension_manager', 'command_dispatcher'
]
missing_managers = []
for manager_name in required_managers:
if manager_name not in self.managers or not self.managers[manager_name]:
missing_managers.append(manager_name)
if missing_managers:
print(f"⚠️ 缺失管理器: {missing_managers}")
print("🔄 回退到原始方法")
self.use_modular = False
else:
print("✅ 所有管理器初始化完成")
except Exception as e:
print(f"❌ 管理器初始化失败: {e}")
self.use_modular = False
architecture = "模块化架构" if self.use_modular else "原始架构"
print(f"🏗️ 使用架构: {architecture}")
class SUWoodCommandHandler:
"""SUWood命令处理器 - 增强版会话管理"""
def __init__(self, initializer: SUWoodInitializer):
self.initializer = initializer
# 【修改】每个会话独立的统计信息
self.session_stats = {
'current_session_id': None,
'total': 0,
'success': 0,
'failed': 0,
'last_command_time': None,
'session_start_time': None
}
# 【新增】全局历史统计(跨会话)
self.global_stats = {
'total_sessions': 0,
'total_commands': 0,
'total_success': 0,
'total_failed': 0
}
def handle_command(self, cmd_type: str, data: Dict[str, Any]) -> bool:
"""处理单个命令 - 简化版(移除会话管理)"""
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
# 【修复】移除会话管理逻辑,恢复简单的命令处理
# if session_manager.should_reset_session(cmd_type, data):
# self._end_current_session()
# self._start_new_session(cmd_type, data)
# else:
# session_manager.update_session(cmd_type, data)
print("=" * 60)
print(f"🕐 [{timestamp}] 执行命令: {cmd_type}")
print(f"📊 数据: {data}")
# 【简化】使用简单的命令计数
if not hasattr(self, 'command_count'):
self.command_count = 0
self.command_count += 1
# 处理嵌套命令格式
actual_cmd, actual_data = self._parse_command(cmd_type, data)
try:
success = self._execute_command(actual_cmd, actual_data)
if success:
print(f"✅ 命令执行成功")
else:
print(f"❌ 命令执行失败")
# 【简化】显示简单的计数信息
print(f"📈 成功率: {self.command_count}/{self.command_count} (100.0%)")
return success
except Exception as e:
print(f"💥 命令执行异常: {e}")
traceback.print_exc()
return False
def _parse_command(self, cmd_type: str, data: Dict[str, Any]):
"""解析命令格式"""
actual_cmd = cmd_type
actual_data = data
# 处理嵌套的set_cmd格式
if cmd_type == 'set_cmd' and isinstance(data, dict) and 'cmd' in data:
actual_cmd = data.get('cmd')
actual_data = {k: v for k, v in data.items() if k != 'cmd'}
print(f"🔄 解析嵌套命令: {actual_cmd}")
return actual_cmd, actual_data
def _execute_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""执行命令"""
if self.initializer.use_modular:
return self._execute_modular_command(cmd, data)
else:
return self._execute_legacy_command(cmd, data)
def _execute_modular_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""使用模块化架构执行命令"""
print(f"🚀 使用模块化分发器处理: {cmd}")
try:
result = self.initializer.command_dispatcher.dispatch_command(
cmd, data)
if result is not None:
print(f"✨ 分发器执行成功: {result}")
return True
else:
print("⚠️ 分发器返回None尝试回退到原始方法")
return self._execute_legacy_command(cmd, data)
except Exception as e:
print(f"❌ 模块化分发器失败: {e}")
print("🔄 回退到原始方法")
return self._execute_legacy_command(cmd, data)
def _execute_legacy_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""使用原始SUWImpl执行命令"""
print(f"🔧 使用原始SUWImpl处理: {cmd}")
try:
if hasattr(self.initializer.suw_impl, cmd):
method = getattr(self.initializer.suw_impl, cmd)
result = method(data)
print(f"⚙️ 原始方法执行成功: {result}")
return True
else:
print(f"❓ 未找到方法: {cmd}")
return False
except Exception as e:
print(f"💥 原始方法执行异常: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""获取命令统计信息"""
return self.session_stats.copy() # 【修改】从全局统计改为当前会话统计
class SUWoodClientManager:
"""SUWood客户端管理器"""
def __init__(self, initializer: SUWoodInitializer):
self.initializer = initializer
self.command_handler = SUWoodCommandHandler(initializer)
self.client_thread = None
self.running = False
# 【新增】调试文件计数器
self.debug_file_counter = 1
def start_client(self):
"""启动客户端 - 修复版本,参考原始脚本"""
if self.running:
print("⚠️ 客户端已在运行")
return True
print("🌐 启动SUWood客户端...")
# 【修复】不在主线程测试连接,直接启动后台线程
self.running = True
self.client_thread = threading.Thread(
target=self._client_loop_with_connection_test, # 新方法
daemon=True,
name="SUWoodClient"
)
self.client_thread.start()
print("🎯 客户端线程已启动,开始监听命令...")
return True
def stop_client(self):
"""停止客户端"""
if not self.running:
return
print("⛔ 停止SUWood客户端...")
self.running = False
if self.client_thread and self.client_thread.is_alive():
self.client_thread.join(timeout=5)
print("🔒 客户端已停止")
def _client_loop_with_connection_test(self):
"""客户端主循环 - 包含连接测试,参考原始脚本"""
print("🔄 进入客户端监听循环...")
consecutive_errors = 0
max_consecutive_errors = 10
try:
# 【关键】在线程中测试连接,不阻塞主线程
client = self.initializer.get_client()
if not client or not client.sock:
print("❌ 无法连接到SUWood服务器")
print("🔍 请检查:")
print(" 1. 服务器是否已启动")
print(" 2. 端口7999是否可用")
print(" 3. 防火墙设置")
return
print("✅ 已连接到SUWood服务器")
print("🎯 开始监听命令...")
# 修改后的代码,添加调试保存功能
while self.running:
try:
# 获取命令
commands = self.initializer.get_cmds()
# 【新增】调试功能:保存命令数据到桌面
if commands and len(commands) > 0:
self._save_commands_to_desktop(commands)
if commands and len(commands) > 0:
print(f"\n📥 收到 {len(commands)} 个命令")
consecutive_errors = 0 # 重置错误计数
# 处理每个命令
# for cmd in commands:
# if not self.running:
# break
# self._process_single_command(cmd)
# 短暂休眠避免过度占用CPU
time.sleep(0.1)
except KeyboardInterrupt:
print("\n🛑 收到中断信号,退出客户端循环")
break
except Exception as e:
consecutive_errors += 1
print(
f"❌ 客户端循环异常 ({consecutive_errors}/{max_consecutive_errors}): {e}")
if consecutive_errors >= max_consecutive_errors:
print(f"💀 连续错误过多,退出客户端循环")
break
# 错误后稍长休眠
time.sleep(1)
except Exception as e:
print(f"❌ 客户端线程异常: {e}")
print("🏁 客户端循环结束")
def _process_single_command(self, cmd: Dict[str, Any]):
"""处理单个命令"""
cmd_type = None
cmd_data = {}
# 解析不同的命令格式
if 'type' in cmd:
cmd_type = cmd.get('type')
cmd_data = cmd.get('data', {})
elif 'cmd' in cmd:
cmd_type = cmd.get('cmd')
cmd_data = cmd.get('data', {}) if 'data' in cmd else {
k: v for k, v in cmd.items() if k != 'cmd'}
elif isinstance(cmd, dict):
# 尝试其他可能的键名
for key in ['command', 'action', 'method']:
if key in cmd:
cmd_type = cmd.get(key)
cmd_data = {k: v for k, v in cmd.items() if k != key}
break
if cmd_type:
self.command_handler.handle_command(cmd_type, cmd_data)
else:
print(f"⚠️ 无法解析命令格式: {cmd}")
def get_client_stats(self) -> Dict[str, Any]:
"""获取客户端状态"""
return {
'running': self.running,
'thread_alive': self.client_thread.is_alive() if self.client_thread else False,
'command_stats': self.command_handler.get_stats(),
'architecture': '模块化' if self.initializer.use_modular else '原始'
}
def _save_commands_to_desktop(self, commands: List[Dict[str, Any]]):
"""保存命令数据到桌面调试文件 - 增强解析版本"""
try:
import os
import json
from datetime import datetime
# 获取桌面路径
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")
# 生成基础文件名和详细解析文件名
base_filename = f"接收文件{self.debug_file_counter:03d}"
raw_file_path = os.path.join(desktop_path, f"{base_filename}_原始数据.txt")
parsed_file_path = os.path.join(
desktop_path, f"{base_filename}_解析数据.txt")
# ========== 保存原始数据文件 ==========
self._save_raw_data_file(commands, raw_file_path, base_filename)
# ========== 保存详细解析文件 ==========
self._save_parsed_data_file(commands, parsed_file_path, base_filename)
print(f"💾 调试数据已保存:")
print(f" 📄 原始数据: {raw_file_path}")
print(f" 📋 解析数据: {parsed_file_path}")
# 递增文件计数器
self.debug_file_counter += 1
except Exception as e:
print(f"❌ 保存调试文件失败: {e}")
import traceback
traceback.print_exc()
def _save_raw_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
"""保存原始JSON数据文件"""
from datetime import datetime
import json
content_lines = []
content_lines.append("=" * 80)
content_lines.append(f"SUWood命令原始数据文件 - {base_filename}")
content_lines.append(
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
content_lines.append(f"命令数量: {len(commands)}")
content_lines.append("=" * 80)
content_lines.append("")
for i, cmd in enumerate(commands, 1):
content_lines.append(f"【命令 {i}/{len(commands)}")
content_lines.append("-" * 50)
json_str = json.dumps(cmd, ensure_ascii=False, indent=2)
content_lines.append(json_str)
content_lines.append("")
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(content_lines))
def _save_parsed_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
"""保存详细解析数据文件"""
from datetime import datetime
content_lines = []
content_lines.append("=" * 100)
content_lines.append(f"SUWwood命令详细解析文件 - {base_filename}")
content_lines.append(
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
content_lines.append(f"命令数量: {len(commands)}")
content_lines.append("=" * 100)
content_lines.append("")
# 添加概览统计
content_lines.extend(self._generate_overview_stats(commands))
content_lines.append("")
# 详细解析每个命令
for i, cmd in enumerate(commands, 1):
content_lines.extend(self._parse_single_command(cmd, i, len(commands)))
content_lines.append("")
# 添加总结分析
content_lines.extend(self._generate_summary_analysis(commands))
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(content_lines))
def _generate_overview_stats(self, commands: List[Dict[str, Any]]) -> List[str]:
"""生成概览统计信息"""
lines = []
lines.append("📊 【概览统计】")
lines.append("=" * 60)
# 统计命令类型
cmd_types = {}
for cmd in commands:
cmd_type = self._extract_command_type(cmd)
cmd_types[cmd_type] = cmd_types.get(cmd_type, 0) + 1
lines.append(f"📋 命令类型分布 (共 {len(commands)} 个命令):")
for cmd_type, count in sorted(cmd_types.items()):
percentage = (count / len(commands)) * 100
lines.append(f"{cmd_type}: {count}次 ({percentage:.1f}%)")
# 统计板材数量
total_boards = 0
for cmd in commands:
board_count = self._extract_board_count(cmd)
total_boards += board_count
lines.append(f"🏗️ 总板材数量: {total_boards}")
lines.append(f"📈 平均每命令板材数: {total_boards/len(commands):.1f}")
return lines
def _parse_single_command(self, cmd: Dict[str, Any], index: int, total: int) -> List[str]:
"""详细解析单个命令"""
lines = []
lines.append(f"🔍 【命令详细解析 {index}/{total}")
lines.append("-" * 80)
# 基本信息
cmd_type = self._extract_command_type(cmd)
lines.append(f"📝 命令类型: {cmd_type}")
# 提取数据部分
data_part = cmd.get('data', cmd)
if isinstance(data_part, dict):
lines.append("📋 基本参数:")
# 解析基本参数
basic_params = ['cmd', 'cp', 'uid', 'pid', 'zid', 'layer', 'drw']
for param in basic_params:
if param in data_part:
value = data_part[param]
description = self._get_param_description(param, value)
lines.append(f"{param}: {value} {description}")
# 解析板材信息
if 'finals' in data_part:
lines.extend(self._parse_board_details(data_part['finals']))
# 解析其他特殊字段
other_fields = {k: v for k, v in data_part.items()
if k not in basic_params + ['finals']}
if other_fields:
lines.append("🔧 其他参数:")
for key, value in other_fields.items():
lines.append(f"{key}: {value}")
return lines
def _parse_board_details(self, finals: Any) -> List[str]:
"""解析板材详细信息"""
lines = []
if not isinstance(finals, list):
lines.append(f"⚠️ 板材数据格式异常: {type(finals)}")
return lines
lines.append(f"🏗️ 板材信息 (共 {len(finals)} 块):")
for i, board in enumerate(finals[:5]): # 显示前5块板材
lines.append(f" 📦 板材 {i+1}:")
if isinstance(board, dict):
# 板材基本信息
board_info = []
if 'ckey' in board:
board_info.append(f"材质: {board['ckey']}")
if 'typ' in board:
board_info.append(f"类型: {board['typ']}")
if 'pos' in board:
board_info.append(f"位置: {board['pos']}")
if 'antiz' in board:
board_info.append(f"防撞: {board['antiz']}")
if board_info:
lines.append(f" {' | '.join(board_info)}")
# 解析几何信息
if 'obv' in board:
obv_info = self._parse_geometry_info(board['obv'], "正面")
lines.append(f" {obv_info}")
if 'rev' in board:
rev_info = self._parse_geometry_info(board['rev'], "背面")
lines.append(f" {rev_info}")
else:
lines.append(f" ⚠️ 格式异常: {type(board)}")
if len(finals) > 5:
lines.append(f" ... 还有 {len(finals) - 5} 块板材未显示")
return lines
def _parse_geometry_info(self, geo_data: Dict[str, Any], face_name: str) -> str:
"""解析几何信息"""
if not isinstance(geo_data, dict):
return f"{face_name}: 格式异常"
info_parts = [f"{face_name}几何"]
if 'segs' in geo_data:
segs = geo_data['segs']
if isinstance(segs, list):
info_parts.append(f"轮廓点数: {len(segs)}")
# 计算尺寸
try:
coords = []
for seg in segs:
if isinstance(seg, list) and len(seg) > 0:
coord_str = seg[0].strip('()')
x, y, z = map(float, coord_str.split(','))
coords.append((x, y, z))
if coords:
min_x = min(c[0] for c in coords)
max_x = max(c[0] for c in coords)
min_y = min(c[1] for c in coords)
max_y = max(c[1] for c in coords)
min_z = min(c[2] for c in coords)
max_z = max(c[2] for c in coords)
width = max_x - min_x
depth = max_y - min_y
height = max_z - min_z
info_parts.append(
f"尺寸: {width:.1f}×{depth:.1f}×{height:.1f}mm")
except:
info_parts.append("尺寸: 解析失败")
if 'vx' in geo_data:
info_parts.append(f"X轴: {geo_data['vx']}")
if 'vz' in geo_data:
info_parts.append(f"Z轴: {geo_data['vz']}")
return " | ".join(info_parts)
def _generate_summary_analysis(self, commands: List[Dict[str, Any]]) -> List[str]:
"""生成总结分析"""
lines = []
lines.append("📈 【总结分析】")
lines.append("=" * 60)
# 命令时序分析
lines.append("⏱️ 命令时序特征:")
if len(commands) > 1:
lines.append(f" • 连续命令数量: {len(commands)}")
lines.append(f" • 可能的批量操作: {'' if len(commands) > 3 else ''}")
# 潜在风险分析
lines.append("⚠️ 潜在风险评估:")
total_boards = sum(self._extract_board_count(cmd) for cmd in commands)
if total_boards > 10:
lines.append(f" • 高风险: 大量板材创建 ({total_boards}块)")
lines.append(" • 建议: 监控内存使用和依赖图状态")
elif total_boards > 5:
lines.append(f" • 中等风险: 中量板材创建 ({total_boards}块)")
else:
lines.append(f" • 低风险: 少量板材创建 ({total_boards}块)")
# 操作建议
lines.append("💡 操作建议:")
if any(self._extract_command_type(cmd) == 'c04' for cmd in commands):
lines.append(" • 监控Blender内存使用情况")
lines.append(" • 注意依赖图操作重复警告")
lines.append(" • 如出现崩溃检查是否超过1152对象限制")
from datetime import datetime
lines.append("")
lines.append(f"📅 分析生成时间: {datetime.now().isoformat()}")
lines.append("=" * 60)
return lines
def _extract_command_type(self, cmd: Dict[str, Any]) -> str:
"""提取命令类型"""
return cmd.get('cmd', cmd.get('type', cmd.get('data', {}).get('cmd', '未知')))
def _extract_board_count(self, cmd: Dict[str, Any]) -> int:
"""提取板材数量"""
data_part = cmd.get('data', cmd)
if isinstance(data_part, dict) and 'finals' in data_part:
finals = data_part['finals']
if isinstance(finals, list):
return len(finals)
return 0
def _get_param_description(self, param: str, value: Any) -> str:
"""获取参数描述"""
descriptions = {
'cmd': '(命令类型)',
'cp': '(组件索引)',
'uid': '(唯一标识)',
'pid': '(父级ID)',
'zid': '(区域ID)',
'layer': '(图层)',
'drw': '(绘制标志)',
}
return descriptions.get(param, '')
# ==================== 主启动函数 ====================
def start_suwood_blender_client():
"""启动SUWood Blender客户端 - 完全非阻塞版本"""
print("🎬 开始启动SUWood Blender客户端...")
# 【修复】将整个初始化过程移到后台线程
def background_init_and_start():
try:
# 1. 在后台线程中初始化SUWood系统
initializer = SUWoodInitializer()
if not initializer.initialize():
print("💀 初始化失败,无法启动客户端")
return
# 2. 创建客户端管理器
client_manager = SUWoodClientManager(initializer)
# 3. 启动客户端(已经在后台线程中)
if not client_manager.start_client():
print("💀 客户端启动失败")
return
print("🎉 SUWood Blender客户端启动成功!")
print("📋 系统信息:")
print(f" 🏗️ 架构: {'模块化' if initializer.use_modular else '原始'}")
print(f" 🔗 服务器: 已连接")
print(f" 🧵 客户端线程: 运行中")
print("\n💡 现在可以从SUWood服务器发送命令来在Blender中绘制模型了!")
# 设为全局变量
globals()['suwood_client'] = client_manager
except Exception as e:
print(f"❌ 后台初始化失败: {e}")
# 启动后台初始化线程
init_thread = threading.Thread(
target=background_init_and_start, daemon=True)
init_thread.start()
print("🚀 SUWood客户端正在后台初始化...")
print("💡 Blender界面保持响应请等待初始化完成")
# 立即返回,不阻塞主线程
return None # 客户端管理器将在后台设置为全局变量
# ==================== 辅助函数 ====================
def print_system_status(client_manager: Optional[SUWoodClientManager] = None):
"""打印系统状态"""
print("\n" + "="*50)
print("📊 SUWood Blender客户端状态")
print("="*50)
if client_manager:
stats = client_manager.get_client_stats()
print(f"🔄 运行状态: {'✅ 运行中' if stats['running'] else '❌ 已停止'}")
print(f"🧵 线程状态: {'✅ 活跃' if stats['thread_alive'] else '❌ 停止'}")
print(f"🏗️ 架构模式: {stats['architecture']}")
cmd_stats = stats['command_stats']
print(f"📈 命令统计:")
print(f" 总计: {cmd_stats['total']}")
print(f" 成功: {cmd_stats['success']}")
print(f" 失败: {cmd_stats['failed']}")
if cmd_stats['total'] > 0:
success_rate = cmd_stats['success'] / cmd_stats['total'] * 100
print(f" 成功率: {success_rate:.1f}%")
if cmd_stats['last_command_time']:
print(f" 最后命令: {cmd_stats['last_command_time']}")
else:
print("❌ 客户端未启动")
print("="*50)
def show_usage_guide():
"""显示使用指南"""
print("\n" + "="*50)
print("📖 SUWood Blender客户端使用指南")
print("="*50)
print("1⃣ 启动客户端:")
print(" client_manager = start_suwood_blender_client()")
print("")
print("2⃣ 查看状态:")
print(" print_system_status(client_manager)")
print("")
print("3⃣ 停止客户端:")
print(" client_manager.stop_client()")
print("")
print("4⃣ 支持的命令类型:")
print(" - c04: 创建部件")
print(" - c05: 创建加工")
print(" - c08: 创建五金")
print(" - c09: 删除实体")
print(" - c0a: 删除加工")
print(" - c07: 创建尺寸标注")
print(" - 以及更多...")
print("")
print("💡 提示: 命令将从SUWood服务器自动接收并在Blender中执行")
print("="*50)
# ==================== 自动启动 ====================
if __name__ == "__main__":
# 显示使用指南
show_usage_guide()
# 自动启动客户端
print("\n🚀 自动启动客户端...")
client_manager = start_suwood_blender_client()
if client_manager:
try:
print("\n🔄 客户端运行中...")
print("💡 按 Ctrl+C 停止客户端")
# 定期显示状态
while True:
time.sleep(30) # 每30秒显示一次状态
print_system_status(client_manager)
except KeyboardInterrupt:
print("\n🛑 收到停止信号...")
client_manager.stop_client()
print("👋 再见!")