suwoodblender/test/blender_suw_core_client.py

922 lines
33 KiB
Python
Raw Permalink Normal View History

2025-07-18 17:09:39 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Blender SUWood Core Client - 完整集成版本
用途: 在Blender中启动SUW Core模块并连接SUWood服务器
版本: 1.0.0
作者: SUWood Team
功能特性:
1. 自动检测并初始化suw_core模块化架构
2. 启动suw_client与SUWood服务器通信
3. 使用命令分发器优化处理服务器命令
4. 在Blender中实时绘制3D模型
5. 支持所有SUWood操作(创建部件加工删除等)
"""
import sys
import os
import time
import threading
import datetime
import traceback
from typing import Dict, Any, Optional, List
import logging
# ==================== 移除会话管理器导入 ====================
# from session_manager import session_manager # 【移除】暂时移除会话管理
# ==================== 路径配置 ====================
SUWOOD_PATH = r"D:\XL\code\blender\blenderpython"
AUTO_INIT_PATH = r"D:\XL\code\blender\test\auto_init_suwood.py"
print("🚀 Blender SUWood Core Client 启动...")
print(f"📁 SUWood路径: {SUWOOD_PATH}")
# 添加路径到Python搜索路径
if SUWOOD_PATH not in sys.path:
sys.path.insert(0, SUWOOD_PATH)
# ==================== 模块导入和初始化 ====================
class SUWoodInitializer:
"""SUWood初始化器"""
def __init__(self):
self.suw_impl = None
self.suw_core = None
self.command_dispatcher = None
self.use_modular = False
self.managers = {}
def initialize(self):
"""初始化SUWood系统"""
try:
# 1. 导入基础模块
self._import_basic_modules()
# 2. 初始化SUWImpl
self._init_suw_impl()
# 3. 检查并初始化suw_core
self._init_suw_core()
# 4. 设置命令分发器
self._setup_command_dispatcher()
return True
except Exception as e:
print(f"❌ SUWood初始化失败: {e}")
traceback.print_exc()
return False
def _import_basic_modules(self):
"""导入基础模块"""
print("📦 导入基础模块...")
try:
from suw_impl import SUWImpl
from suw_client import SUWClient, get_client, get_cmds, set_cmd
self.SUWImpl = SUWImpl
self.get_client = get_client
self.get_cmds = get_cmds
self.set_cmd = set_cmd
print("✅ 基础模块导入成功")
except Exception as e:
print(f"❌ 基础模块导入失败: {e}")
raise
def _init_suw_impl(self):
"""初始化SUWImpl"""
print("🔧 初始化SUWImpl...")
self.suw_impl = self.SUWImpl.get_instance()
self.suw_impl.startup()
print("✅ SUWImpl初始化完成")
def _init_suw_core(self):
"""检查并初始化suw_core"""
print("🔍 检查suw_core模块...")
try:
import suw_core
self.suw_core = suw_core
# 检查是否已经有完整的模块化架构
if (hasattr(suw_core, 'command_dispatcher') and
suw_core.command_dispatcher and
hasattr(suw_core.command_dispatcher, 'dispatch_command')):
print("✅ 检测到完整的模块化架构")
self.use_modular = True
self.command_dispatcher = suw_core.command_dispatcher
else:
print("⚙️ 模块化架构不完整,尝试自动修复...")
self._apply_auto_fix()
except ImportError as e:
print(f"❌ suw_core导入失败: {e}")
print("🔄 将使用原始SUWImpl处理命令")
self.use_modular = False
def _apply_auto_fix(self):
"""应用自动修复脚本"""
try:
if os.path.exists(AUTO_INIT_PATH):
print(f"📄 执行自动修复脚本: {AUTO_INIT_PATH}")
# 创建一个安全的执行环境
safe_globals = {
'__name__': '__main__',
'__file__': AUTO_INIT_PATH,
'sys': sys,
}
with open(AUTO_INIT_PATH, 'r', encoding='utf-8') as f:
exec(f.read(), safe_globals)
# 重新检查模块化架构
if (hasattr(self.suw_core, 'command_dispatcher') and
self.suw_core.command_dispatcher and
hasattr(self.suw_core.command_dispatcher, 'dispatch_command')):
print("✅ 自动修复成功,模块化架构已激活")
self.use_modular = True
self.command_dispatcher = self.suw_core.command_dispatcher
else:
print("⚠️ 自动修复后仍未检测到完整架构,使用原始方法")
self.use_modular = False
else:
print(f"⚠️ 自动修复脚本不存在: {AUTO_INIT_PATH}")
self.use_modular = False
except Exception as e:
print(f"❌ 自动修复失败: {e}")
self.use_modular = False
def _setup_command_dispatcher(self):
"""设置命令分发器"""
if self.use_modular and self.command_dispatcher:
print("⚙️ 配置模块化命令分发器...")
# 确保所有管理器都正确初始化
try:
self.managers = self.suw_core.init_all_managers(self.suw_impl)
# 验证关键管理器
required_managers = [
'part_creator', 'material_manager', 'machining_manager',
'selection_manager', 'deletion_manager', 'hardware_manager',
'door_drawer_manager', 'dimension_manager', 'command_dispatcher'
]
missing_managers = []
for manager_name in required_managers:
if manager_name not in self.managers or not self.managers[manager_name]:
missing_managers.append(manager_name)
if missing_managers:
print(f"⚠️ 缺失管理器: {missing_managers}")
print("🔄 回退到原始方法")
self.use_modular = False
else:
print("✅ 所有管理器初始化完成")
except Exception as e:
print(f"❌ 管理器初始化失败: {e}")
self.use_modular = False
architecture = "模块化架构" if self.use_modular else "原始架构"
print(f"🏗️ 使用架构: {architecture}")
class SUWoodCommandHandler:
"""SUWood命令处理器 - 增强版会话管理"""
def __init__(self, initializer: SUWoodInitializer):
self.initializer = initializer
# 【修改】每个会话独立的统计信息
self.session_stats = {
'current_session_id': None,
'total': 0,
'success': 0,
'failed': 0,
'last_command_time': None,
'session_start_time': None
}
# 【新增】全局历史统计(跨会话)
self.global_stats = {
'total_sessions': 0,
'total_commands': 0,
'total_success': 0,
'total_failed': 0
}
def handle_command(self, cmd_type: str, data: Dict[str, Any]) -> bool:
"""处理单个命令 - 简化版(移除会话管理)"""
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
# 【修复】移除会话管理逻辑,恢复简单的命令处理
# if session_manager.should_reset_session(cmd_type, data):
# self._end_current_session()
# self._start_new_session(cmd_type, data)
# else:
# session_manager.update_session(cmd_type, data)
print("=" * 60)
print(f"🕐 [{timestamp}] 执行命令: {cmd_type}")
print(f"📊 数据: {data}")
# 【简化】使用简单的命令计数
if not hasattr(self, 'command_count'):
self.command_count = 0
self.command_count += 1
# 处理嵌套命令格式
actual_cmd, actual_data = self._parse_command(cmd_type, data)
try:
success = self._execute_command(actual_cmd, actual_data)
if success:
print(f"✅ 命令执行成功")
else:
print(f"❌ 命令执行失败")
# 【简化】显示简单的计数信息
print(f"📈 成功率: {self.command_count}/{self.command_count} (100.0%)")
return success
except Exception as e:
print(f"💥 命令执行异常: {e}")
traceback.print_exc()
return False
def _parse_command(self, cmd_type: str, data: Dict[str, Any]):
"""解析命令格式"""
actual_cmd = cmd_type
actual_data = data
# 处理嵌套的set_cmd格式
if cmd_type == 'set_cmd' and isinstance(data, dict) and 'cmd' in data:
actual_cmd = data.get('cmd')
actual_data = {k: v for k, v in data.items() if k != 'cmd'}
print(f"🔄 解析嵌套命令: {actual_cmd}")
return actual_cmd, actual_data
def _execute_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""执行命令"""
if self.initializer.use_modular:
return self._execute_modular_command(cmd, data)
else:
return self._execute_legacy_command(cmd, data)
def _execute_modular_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""使用模块化架构执行命令"""
print(f"🚀 使用模块化分发器处理: {cmd}")
try:
result = self.initializer.command_dispatcher.dispatch_command(
cmd, data)
if result is not None:
print(f"✨ 分发器执行成功: {result}")
return True
else:
print("⚠️ 分发器返回None尝试回退到原始方法")
return self._execute_legacy_command(cmd, data)
except Exception as e:
print(f"❌ 模块化分发器失败: {e}")
print("🔄 回退到原始方法")
return self._execute_legacy_command(cmd, data)
def _execute_legacy_command(self, cmd: str, data: Dict[str, Any]) -> bool:
"""使用原始SUWImpl执行命令"""
print(f"🔧 使用原始SUWImpl处理: {cmd}")
try:
if hasattr(self.initializer.suw_impl, cmd):
method = getattr(self.initializer.suw_impl, cmd)
result = method(data)
print(f"⚙️ 原始方法执行成功: {result}")
return True
else:
print(f"❓ 未找到方法: {cmd}")
return False
except Exception as e:
print(f"💥 原始方法执行异常: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""获取命令统计信息"""
return self.session_stats.copy() # 【修改】从全局统计改为当前会话统计
class SUWoodClientManager:
"""SUWood客户端管理器"""
def __init__(self, initializer: SUWoodInitializer):
self.initializer = initializer
self.command_handler = SUWoodCommandHandler(initializer)
self.client_thread = None
self.running = False
# 【新增】调试文件计数器
self.debug_file_counter = 1
def start_client(self):
"""启动客户端 - 修复版本,参考原始脚本"""
if self.running:
print("⚠️ 客户端已在运行")
return True
print("🌐 启动SUWood客户端...")
# 【修复】不在主线程测试连接,直接启动后台线程
self.running = True
self.client_thread = threading.Thread(
target=self._client_loop_with_connection_test, # 新方法
daemon=True,
name="SUWoodClient"
)
self.client_thread.start()
print("🎯 客户端线程已启动,开始监听命令...")
return True
def stop_client(self):
"""停止客户端"""
if not self.running:
return
print("⛔ 停止SUWood客户端...")
self.running = False
if self.client_thread and self.client_thread.is_alive():
self.client_thread.join(timeout=5)
print("🔒 客户端已停止")
def _client_loop_with_connection_test(self):
"""客户端主循环 - 包含连接测试,参考原始脚本"""
print("🔄 进入客户端监听循环...")
consecutive_errors = 0
max_consecutive_errors = 10
try:
# 【关键】在线程中测试连接,不阻塞主线程
client = self.initializer.get_client()
if not client or not client.sock:
print("❌ 无法连接到SUWood服务器")
print("🔍 请检查:")
print(" 1. 服务器是否已启动")
print(" 2. 端口7999是否可用")
print(" 3. 防火墙设置")
return
print("✅ 已连接到SUWood服务器")
print("🎯 开始监听命令...")
# 修改后的代码,添加调试保存功能
while self.running:
try:
# 获取命令
commands = self.initializer.get_cmds()
# 【新增】调试功能:保存命令数据到桌面
if commands and len(commands) > 0:
self._save_commands_to_desktop(commands)
if commands and len(commands) > 0:
print(f"\n📥 收到 {len(commands)} 个命令")
consecutive_errors = 0 # 重置错误计数
# 处理每个命令
# for cmd in commands:
# if not self.running:
# break
# self._process_single_command(cmd)
# 短暂休眠避免过度占用CPU
time.sleep(0.1)
except KeyboardInterrupt:
print("\n🛑 收到中断信号,退出客户端循环")
break
except Exception as e:
consecutive_errors += 1
print(
f"❌ 客户端循环异常 ({consecutive_errors}/{max_consecutive_errors}): {e}")
if consecutive_errors >= max_consecutive_errors:
print(f"💀 连续错误过多,退出客户端循环")
break
# 错误后稍长休眠
time.sleep(1)
except Exception as e:
print(f"❌ 客户端线程异常: {e}")
print("🏁 客户端循环结束")
def _process_single_command(self, cmd: Dict[str, Any]):
"""处理单个命令"""
cmd_type = None
cmd_data = {}
# 解析不同的命令格式
if 'type' in cmd:
cmd_type = cmd.get('type')
cmd_data = cmd.get('data', {})
elif 'cmd' in cmd:
cmd_type = cmd.get('cmd')
cmd_data = cmd.get('data', {}) if 'data' in cmd else {
k: v for k, v in cmd.items() if k != 'cmd'}
elif isinstance(cmd, dict):
# 尝试其他可能的键名
for key in ['command', 'action', 'method']:
if key in cmd:
cmd_type = cmd.get(key)
cmd_data = {k: v for k, v in cmd.items() if k != key}
break
if cmd_type:
self.command_handler.handle_command(cmd_type, cmd_data)
else:
print(f"⚠️ 无法解析命令格式: {cmd}")
def get_client_stats(self) -> Dict[str, Any]:
"""获取客户端状态"""
return {
'running': self.running,
'thread_alive': self.client_thread.is_alive() if self.client_thread else False,
'command_stats': self.command_handler.get_stats(),
'architecture': '模块化' if self.initializer.use_modular else '原始'
}
def _save_commands_to_desktop(self, commands: List[Dict[str, Any]]):
"""保存命令数据到桌面调试文件 - 增强解析版本"""
try:
import os
import json
from datetime import datetime
# 获取桌面路径
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")
# 生成基础文件名和详细解析文件名
base_filename = f"接收文件{self.debug_file_counter:03d}"
raw_file_path = os.path.join(desktop_path, f"{base_filename}_原始数据.txt")
parsed_file_path = os.path.join(
desktop_path, f"{base_filename}_解析数据.txt")
# ========== 保存原始数据文件 ==========
self._save_raw_data_file(commands, raw_file_path, base_filename)
# ========== 保存详细解析文件 ==========
self._save_parsed_data_file(commands, parsed_file_path, base_filename)
print(f"💾 调试数据已保存:")
print(f" 📄 原始数据: {raw_file_path}")
print(f" 📋 解析数据: {parsed_file_path}")
# 递增文件计数器
self.debug_file_counter += 1
except Exception as e:
print(f"❌ 保存调试文件失败: {e}")
import traceback
traceback.print_exc()
def _save_raw_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
"""保存原始JSON数据文件"""
from datetime import datetime
import json
content_lines = []
content_lines.append("=" * 80)
content_lines.append(f"SUWood命令原始数据文件 - {base_filename}")
content_lines.append(
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
content_lines.append(f"命令数量: {len(commands)}")
content_lines.append("=" * 80)
content_lines.append("")
for i, cmd in enumerate(commands, 1):
content_lines.append(f"【命令 {i}/{len(commands)}")
content_lines.append("-" * 50)
json_str = json.dumps(cmd, ensure_ascii=False, indent=2)
content_lines.append(json_str)
content_lines.append("")
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(content_lines))
def _save_parsed_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
"""保存详细解析数据文件"""
from datetime import datetime
content_lines = []
content_lines.append("=" * 100)
content_lines.append(f"SUWwood命令详细解析文件 - {base_filename}")
content_lines.append(
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
content_lines.append(f"命令数量: {len(commands)}")
content_lines.append("=" * 100)
content_lines.append("")
# 添加概览统计
content_lines.extend(self._generate_overview_stats(commands))
content_lines.append("")
# 详细解析每个命令
for i, cmd in enumerate(commands, 1):
content_lines.extend(self._parse_single_command(cmd, i, len(commands)))
content_lines.append("")
# 添加总结分析
content_lines.extend(self._generate_summary_analysis(commands))
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(content_lines))
def _generate_overview_stats(self, commands: List[Dict[str, Any]]) -> List[str]:
"""生成概览统计信息"""
lines = []
lines.append("📊 【概览统计】")
lines.append("=" * 60)
# 统计命令类型
cmd_types = {}
for cmd in commands:
cmd_type = self._extract_command_type(cmd)
cmd_types[cmd_type] = cmd_types.get(cmd_type, 0) + 1
lines.append(f"📋 命令类型分布 (共 {len(commands)} 个命令):")
for cmd_type, count in sorted(cmd_types.items()):
percentage = (count / len(commands)) * 100
lines.append(f"{cmd_type}: {count}次 ({percentage:.1f}%)")
# 统计板材数量
total_boards = 0
for cmd in commands:
board_count = self._extract_board_count(cmd)
total_boards += board_count
lines.append(f"🏗️ 总板材数量: {total_boards}")
lines.append(f"📈 平均每命令板材数: {total_boards/len(commands):.1f}")
return lines
def _parse_single_command(self, cmd: Dict[str, Any], index: int, total: int) -> List[str]:
"""详细解析单个命令"""
lines = []
lines.append(f"🔍 【命令详细解析 {index}/{total}")
lines.append("-" * 80)
# 基本信息
cmd_type = self._extract_command_type(cmd)
lines.append(f"📝 命令类型: {cmd_type}")
# 提取数据部分
data_part = cmd.get('data', cmd)
if isinstance(data_part, dict):
lines.append("📋 基本参数:")
# 解析基本参数
basic_params = ['cmd', 'cp', 'uid', 'pid', 'zid', 'layer', 'drw']
for param in basic_params:
if param in data_part:
value = data_part[param]
description = self._get_param_description(param, value)
lines.append(f"{param}: {value} {description}")
# 解析板材信息
if 'finals' in data_part:
lines.extend(self._parse_board_details(data_part['finals']))
# 解析其他特殊字段
other_fields = {k: v for k, v in data_part.items()
if k not in basic_params + ['finals']}
if other_fields:
lines.append("🔧 其他参数:")
for key, value in other_fields.items():
lines.append(f"{key}: {value}")
return lines
def _parse_board_details(self, finals: Any) -> List[str]:
"""解析板材详细信息"""
lines = []
if not isinstance(finals, list):
lines.append(f"⚠️ 板材数据格式异常: {type(finals)}")
return lines
lines.append(f"🏗️ 板材信息 (共 {len(finals)} 块):")
for i, board in enumerate(finals[:5]): # 显示前5块板材
lines.append(f" 📦 板材 {i+1}:")
if isinstance(board, dict):
# 板材基本信息
board_info = []
if 'ckey' in board:
board_info.append(f"材质: {board['ckey']}")
if 'typ' in board:
board_info.append(f"类型: {board['typ']}")
if 'pos' in board:
board_info.append(f"位置: {board['pos']}")
if 'antiz' in board:
board_info.append(f"防撞: {board['antiz']}")
if board_info:
lines.append(f" {' | '.join(board_info)}")
# 解析几何信息
if 'obv' in board:
obv_info = self._parse_geometry_info(board['obv'], "正面")
lines.append(f" {obv_info}")
if 'rev' in board:
rev_info = self._parse_geometry_info(board['rev'], "背面")
lines.append(f" {rev_info}")
else:
lines.append(f" ⚠️ 格式异常: {type(board)}")
if len(finals) > 5:
lines.append(f" ... 还有 {len(finals) - 5} 块板材未显示")
return lines
def _parse_geometry_info(self, geo_data: Dict[str, Any], face_name: str) -> str:
"""解析几何信息"""
if not isinstance(geo_data, dict):
return f"{face_name}: 格式异常"
info_parts = [f"{face_name}几何"]
if 'segs' in geo_data:
segs = geo_data['segs']
if isinstance(segs, list):
info_parts.append(f"轮廓点数: {len(segs)}")
# 计算尺寸
try:
coords = []
for seg in segs:
if isinstance(seg, list) and len(seg) > 0:
coord_str = seg[0].strip('()')
x, y, z = map(float, coord_str.split(','))
coords.append((x, y, z))
if coords:
min_x = min(c[0] for c in coords)
max_x = max(c[0] for c in coords)
min_y = min(c[1] for c in coords)
max_y = max(c[1] for c in coords)
min_z = min(c[2] for c in coords)
max_z = max(c[2] for c in coords)
width = max_x - min_x
depth = max_y - min_y
height = max_z - min_z
info_parts.append(
f"尺寸: {width:.1f}×{depth:.1f}×{height:.1f}mm")
except:
info_parts.append("尺寸: 解析失败")
if 'vx' in geo_data:
info_parts.append(f"X轴: {geo_data['vx']}")
if 'vz' in geo_data:
info_parts.append(f"Z轴: {geo_data['vz']}")
return " | ".join(info_parts)
def _generate_summary_analysis(self, commands: List[Dict[str, Any]]) -> List[str]:
"""生成总结分析"""
lines = []
lines.append("📈 【总结分析】")
lines.append("=" * 60)
# 命令时序分析
lines.append("⏱️ 命令时序特征:")
if len(commands) > 1:
lines.append(f" • 连续命令数量: {len(commands)}")
lines.append(f" • 可能的批量操作: {'' if len(commands) > 3 else ''}")
# 潜在风险分析
lines.append("⚠️ 潜在风险评估:")
total_boards = sum(self._extract_board_count(cmd) for cmd in commands)
if total_boards > 10:
lines.append(f" • 高风险: 大量板材创建 ({total_boards}块)")
lines.append(" • 建议: 监控内存使用和依赖图状态")
elif total_boards > 5:
lines.append(f" • 中等风险: 中量板材创建 ({total_boards}块)")
else:
lines.append(f" • 低风险: 少量板材创建 ({total_boards}块)")
# 操作建议
lines.append("💡 操作建议:")
if any(self._extract_command_type(cmd) == 'c04' for cmd in commands):
lines.append(" • 监控Blender内存使用情况")
lines.append(" • 注意依赖图操作重复警告")
lines.append(" • 如出现崩溃检查是否超过1152对象限制")
from datetime import datetime
lines.append("")
lines.append(f"📅 分析生成时间: {datetime.now().isoformat()}")
lines.append("=" * 60)
return lines
def _extract_command_type(self, cmd: Dict[str, Any]) -> str:
"""提取命令类型"""
return cmd.get('cmd', cmd.get('type', cmd.get('data', {}).get('cmd', '未知')))
def _extract_board_count(self, cmd: Dict[str, Any]) -> int:
"""提取板材数量"""
data_part = cmd.get('data', cmd)
if isinstance(data_part, dict) and 'finals' in data_part:
finals = data_part['finals']
if isinstance(finals, list):
return len(finals)
return 0
def _get_param_description(self, param: str, value: Any) -> str:
"""获取参数描述"""
descriptions = {
'cmd': '(命令类型)',
'cp': '(组件索引)',
'uid': '(唯一标识)',
'pid': '(父级ID)',
'zid': '(区域ID)',
'layer': '(图层)',
'drw': '(绘制标志)',
}
return descriptions.get(param, '')
# ==================== 主启动函数 ====================
def start_suwood_blender_client():
"""启动SUWood Blender客户端 - 完全非阻塞版本"""
print("🎬 开始启动SUWood Blender客户端...")
# 【修复】将整个初始化过程移到后台线程
def background_init_and_start():
try:
# 1. 在后台线程中初始化SUWood系统
initializer = SUWoodInitializer()
if not initializer.initialize():
print("💀 初始化失败,无法启动客户端")
return
# 2. 创建客户端管理器
client_manager = SUWoodClientManager(initializer)
# 3. 启动客户端(已经在后台线程中)
if not client_manager.start_client():
print("💀 客户端启动失败")
return
print("🎉 SUWood Blender客户端启动成功!")
print("📋 系统信息:")
print(f" 🏗️ 架构: {'模块化' if initializer.use_modular else '原始'}")
print(f" 🔗 服务器: 已连接")
print(f" 🧵 客户端线程: 运行中")
print("\n💡 现在可以从SUWood服务器发送命令来在Blender中绘制模型了!")
# 设为全局变量
globals()['suwood_client'] = client_manager
except Exception as e:
print(f"❌ 后台初始化失败: {e}")
# 启动后台初始化线程
init_thread = threading.Thread(
target=background_init_and_start, daemon=True)
init_thread.start()
print("🚀 SUWood客户端正在后台初始化...")
print("💡 Blender界面保持响应请等待初始化完成")
# 立即返回,不阻塞主线程
return None # 客户端管理器将在后台设置为全局变量
# ==================== 辅助函数 ====================
def print_system_status(client_manager: Optional[SUWoodClientManager] = None):
"""打印系统状态"""
print("\n" + "="*50)
print("📊 SUWood Blender客户端状态")
print("="*50)
if client_manager:
stats = client_manager.get_client_stats()
print(f"🔄 运行状态: {'✅ 运行中' if stats['running'] else '❌ 已停止'}")
print(f"🧵 线程状态: {'✅ 活跃' if stats['thread_alive'] else '❌ 停止'}")
print(f"🏗️ 架构模式: {stats['architecture']}")
cmd_stats = stats['command_stats']
print(f"📈 命令统计:")
print(f" 总计: {cmd_stats['total']}")
print(f" 成功: {cmd_stats['success']}")
print(f" 失败: {cmd_stats['failed']}")
if cmd_stats['total'] > 0:
success_rate = cmd_stats['success'] / cmd_stats['total'] * 100
print(f" 成功率: {success_rate:.1f}%")
if cmd_stats['last_command_time']:
print(f" 最后命令: {cmd_stats['last_command_time']}")
else:
print("❌ 客户端未启动")
print("="*50)
def show_usage_guide():
"""显示使用指南"""
print("\n" + "="*50)
print("📖 SUWood Blender客户端使用指南")
print("="*50)
print("1⃣ 启动客户端:")
print(" client_manager = start_suwood_blender_client()")
print("")
print("2⃣ 查看状态:")
print(" print_system_status(client_manager)")
print("")
print("3⃣ 停止客户端:")
print(" client_manager.stop_client()")
print("")
print("4⃣ 支持的命令类型:")
print(" - c04: 创建部件")
print(" - c05: 创建加工")
print(" - c08: 创建五金")
print(" - c09: 删除实体")
print(" - c0a: 删除加工")
print(" - c07: 创建尺寸标注")
print(" - 以及更多...")
print("")
print("💡 提示: 命令将从SUWood服务器自动接收并在Blender中执行")
print("="*50)
# ==================== 自动启动 ====================
if __name__ == "__main__":
# 显示使用指南
show_usage_guide()
# 自动启动客户端
print("\n🚀 自动启动客户端...")
client_manager = start_suwood_blender_client()
if client_manager:
try:
print("\n🔄 客户端运行中...")
print("💡 按 Ctrl+C 停止客户端")
# 定期显示状态
while True:
time.sleep(30) # 每30秒显示一次状态
print_system_status(client_manager)
except KeyboardInterrupt:
print("\n🛑 收到停止信号...")
client_manager.stop_client()
print("👋 再见!")