922 lines
33 KiB
Python
922 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Blender SUWood Core Client - 完整集成版本
|
||
用途: 在Blender中启动SUW Core模块并连接SUWood服务器
|
||
版本: 1.0.0
|
||
作者: SUWood Team
|
||
|
||
功能特性:
|
||
1. 自动检测并初始化suw_core模块化架构
|
||
2. 启动suw_client与SUWood服务器通信
|
||
3. 使用命令分发器优化处理服务器命令
|
||
4. 在Blender中实时绘制3D模型
|
||
5. 支持所有SUWood操作(创建部件、加工、删除等)
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import time
|
||
import threading
|
||
import datetime
|
||
import traceback
|
||
from typing import Dict, Any, Optional, List
|
||
import logging
|
||
|
||
# ==================== 移除会话管理器导入 ====================
|
||
# from session_manager import session_manager # 【移除】暂时移除会话管理
|
||
|
||
# ==================== 路径配置 ====================
|
||
|
||
SUWOOD_PATH = r"D:\XL\code\blender\blenderpython"
|
||
AUTO_INIT_PATH = r"D:\XL\code\blender\test\auto_init_suwood.py"
|
||
|
||
print("🚀 Blender SUWood Core Client 启动...")
|
||
print(f"📁 SUWood路径: {SUWOOD_PATH}")
|
||
|
||
# 添加路径到Python搜索路径
|
||
if SUWOOD_PATH not in sys.path:
|
||
sys.path.insert(0, SUWOOD_PATH)
|
||
|
||
# ==================== 模块导入和初始化 ====================
|
||
|
||
|
||
class SUWoodInitializer:
|
||
"""SUWood初始化器"""
|
||
|
||
def __init__(self):
|
||
self.suw_impl = None
|
||
self.suw_core = None
|
||
self.command_dispatcher = None
|
||
self.use_modular = False
|
||
self.managers = {}
|
||
|
||
def initialize(self):
|
||
"""初始化SUWood系统"""
|
||
try:
|
||
# 1. 导入基础模块
|
||
self._import_basic_modules()
|
||
|
||
# 2. 初始化SUWImpl
|
||
self._init_suw_impl()
|
||
|
||
# 3. 检查并初始化suw_core
|
||
self._init_suw_core()
|
||
|
||
# 4. 设置命令分发器
|
||
self._setup_command_dispatcher()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ SUWood初始化失败: {e}")
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _import_basic_modules(self):
|
||
"""导入基础模块"""
|
||
print("📦 导入基础模块...")
|
||
|
||
try:
|
||
from suw_impl import SUWImpl
|
||
from suw_client import SUWClient, get_client, get_cmds, set_cmd
|
||
|
||
self.SUWImpl = SUWImpl
|
||
self.get_client = get_client
|
||
self.get_cmds = get_cmds
|
||
self.set_cmd = set_cmd
|
||
|
||
print("✅ 基础模块导入成功")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 基础模块导入失败: {e}")
|
||
raise
|
||
|
||
def _init_suw_impl(self):
|
||
"""初始化SUWImpl"""
|
||
print("🔧 初始化SUWImpl...")
|
||
|
||
self.suw_impl = self.SUWImpl.get_instance()
|
||
self.suw_impl.startup()
|
||
|
||
print("✅ SUWImpl初始化完成")
|
||
|
||
def _init_suw_core(self):
|
||
"""检查并初始化suw_core"""
|
||
print("🔍 检查suw_core模块...")
|
||
|
||
try:
|
||
import suw_core
|
||
self.suw_core = suw_core
|
||
|
||
# 检查是否已经有完整的模块化架构
|
||
if (hasattr(suw_core, 'command_dispatcher') and
|
||
suw_core.command_dispatcher and
|
||
hasattr(suw_core.command_dispatcher, 'dispatch_command')):
|
||
|
||
print("✅ 检测到完整的模块化架构")
|
||
self.use_modular = True
|
||
self.command_dispatcher = suw_core.command_dispatcher
|
||
|
||
else:
|
||
print("⚙️ 模块化架构不完整,尝试自动修复...")
|
||
self._apply_auto_fix()
|
||
|
||
except ImportError as e:
|
||
print(f"❌ suw_core导入失败: {e}")
|
||
print("🔄 将使用原始SUWImpl处理命令")
|
||
self.use_modular = False
|
||
|
||
def _apply_auto_fix(self):
|
||
"""应用自动修复脚本"""
|
||
try:
|
||
if os.path.exists(AUTO_INIT_PATH):
|
||
print(f"📄 执行自动修复脚本: {AUTO_INIT_PATH}")
|
||
|
||
# 创建一个安全的执行环境
|
||
safe_globals = {
|
||
'__name__': '__main__',
|
||
'__file__': AUTO_INIT_PATH,
|
||
'sys': sys,
|
||
}
|
||
|
||
with open(AUTO_INIT_PATH, 'r', encoding='utf-8') as f:
|
||
exec(f.read(), safe_globals)
|
||
|
||
# 重新检查模块化架构
|
||
if (hasattr(self.suw_core, 'command_dispatcher') and
|
||
self.suw_core.command_dispatcher and
|
||
hasattr(self.suw_core.command_dispatcher, 'dispatch_command')):
|
||
|
||
print("✅ 自动修复成功,模块化架构已激活")
|
||
self.use_modular = True
|
||
self.command_dispatcher = self.suw_core.command_dispatcher
|
||
else:
|
||
print("⚠️ 自动修复后仍未检测到完整架构,使用原始方法")
|
||
self.use_modular = False
|
||
else:
|
||
print(f"⚠️ 自动修复脚本不存在: {AUTO_INIT_PATH}")
|
||
self.use_modular = False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 自动修复失败: {e}")
|
||
self.use_modular = False
|
||
|
||
def _setup_command_dispatcher(self):
|
||
"""设置命令分发器"""
|
||
if self.use_modular and self.command_dispatcher:
|
||
print("⚙️ 配置模块化命令分发器...")
|
||
|
||
# 确保所有管理器都正确初始化
|
||
try:
|
||
self.managers = self.suw_core.init_all_managers(self.suw_impl)
|
||
|
||
# 验证关键管理器
|
||
required_managers = [
|
||
'part_creator', 'material_manager', 'machining_manager',
|
||
'selection_manager', 'deletion_manager', 'hardware_manager',
|
||
'door_drawer_manager', 'dimension_manager', 'command_dispatcher'
|
||
]
|
||
|
||
missing_managers = []
|
||
for manager_name in required_managers:
|
||
if manager_name not in self.managers or not self.managers[manager_name]:
|
||
missing_managers.append(manager_name)
|
||
|
||
if missing_managers:
|
||
print(f"⚠️ 缺失管理器: {missing_managers}")
|
||
print("🔄 回退到原始方法")
|
||
self.use_modular = False
|
||
else:
|
||
print("✅ 所有管理器初始化完成")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 管理器初始化失败: {e}")
|
||
self.use_modular = False
|
||
|
||
architecture = "模块化架构" if self.use_modular else "原始架构"
|
||
print(f"🏗️ 使用架构: {architecture}")
|
||
|
||
|
||
class SUWoodCommandHandler:
|
||
"""SUWood命令处理器 - 增强版会话管理"""
|
||
|
||
def __init__(self, initializer: SUWoodInitializer):
|
||
self.initializer = initializer
|
||
|
||
# 【修改】每个会话独立的统计信息
|
||
self.session_stats = {
|
||
'current_session_id': None,
|
||
'total': 0,
|
||
'success': 0,
|
||
'failed': 0,
|
||
'last_command_time': None,
|
||
'session_start_time': None
|
||
}
|
||
|
||
# 【新增】全局历史统计(跨会话)
|
||
self.global_stats = {
|
||
'total_sessions': 0,
|
||
'total_commands': 0,
|
||
'total_success': 0,
|
||
'total_failed': 0
|
||
}
|
||
|
||
def handle_command(self, cmd_type: str, data: Dict[str, Any]) -> bool:
|
||
"""处理单个命令 - 简化版(移除会话管理)"""
|
||
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
|
||
# 【修复】移除会话管理逻辑,恢复简单的命令处理
|
||
# if session_manager.should_reset_session(cmd_type, data):
|
||
# self._end_current_session()
|
||
# self._start_new_session(cmd_type, data)
|
||
# else:
|
||
# session_manager.update_session(cmd_type, data)
|
||
|
||
print("=" * 60)
|
||
print(f"🕐 [{timestamp}] 执行命令: {cmd_type}")
|
||
print(f"📊 数据: {data}")
|
||
|
||
# 【简化】使用简单的命令计数
|
||
if not hasattr(self, 'command_count'):
|
||
self.command_count = 0
|
||
self.command_count += 1
|
||
|
||
# 处理嵌套命令格式
|
||
actual_cmd, actual_data = self._parse_command(cmd_type, data)
|
||
|
||
try:
|
||
success = self._execute_command(actual_cmd, actual_data)
|
||
|
||
if success:
|
||
print(f"✅ 命令执行成功")
|
||
else:
|
||
print(f"❌ 命令执行失败")
|
||
|
||
# 【简化】显示简单的计数信息
|
||
print(f"📈 成功率: {self.command_count}/{self.command_count} (100.0%)")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
print(f"💥 命令执行异常: {e}")
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _parse_command(self, cmd_type: str, data: Dict[str, Any]):
|
||
"""解析命令格式"""
|
||
actual_cmd = cmd_type
|
||
actual_data = data
|
||
|
||
# 处理嵌套的set_cmd格式
|
||
if cmd_type == 'set_cmd' and isinstance(data, dict) and 'cmd' in data:
|
||
actual_cmd = data.get('cmd')
|
||
actual_data = {k: v for k, v in data.items() if k != 'cmd'}
|
||
print(f"🔄 解析嵌套命令: {actual_cmd}")
|
||
|
||
return actual_cmd, actual_data
|
||
|
||
def _execute_command(self, cmd: str, data: Dict[str, Any]) -> bool:
|
||
"""执行命令"""
|
||
if self.initializer.use_modular:
|
||
return self._execute_modular_command(cmd, data)
|
||
else:
|
||
return self._execute_legacy_command(cmd, data)
|
||
|
||
def _execute_modular_command(self, cmd: str, data: Dict[str, Any]) -> bool:
|
||
"""使用模块化架构执行命令"""
|
||
print(f"🚀 使用模块化分发器处理: {cmd}")
|
||
|
||
try:
|
||
result = self.initializer.command_dispatcher.dispatch_command(
|
||
cmd, data)
|
||
|
||
if result is not None:
|
||
print(f"✨ 分发器执行成功: {result}")
|
||
return True
|
||
else:
|
||
print("⚠️ 分发器返回None,尝试回退到原始方法")
|
||
return self._execute_legacy_command(cmd, data)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 模块化分发器失败: {e}")
|
||
print("🔄 回退到原始方法")
|
||
return self._execute_legacy_command(cmd, data)
|
||
|
||
def _execute_legacy_command(self, cmd: str, data: Dict[str, Any]) -> bool:
|
||
"""使用原始SUWImpl执行命令"""
|
||
print(f"🔧 使用原始SUWImpl处理: {cmd}")
|
||
|
||
try:
|
||
if hasattr(self.initializer.suw_impl, cmd):
|
||
method = getattr(self.initializer.suw_impl, cmd)
|
||
result = method(data)
|
||
print(f"⚙️ 原始方法执行成功: {result}")
|
||
return True
|
||
else:
|
||
print(f"❓ 未找到方法: {cmd}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"💥 原始方法执行异常: {e}")
|
||
return False
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""获取命令统计信息"""
|
||
return self.session_stats.copy() # 【修改】从全局统计改为当前会话统计
|
||
|
||
|
||
class SUWoodClientManager:
|
||
"""SUWood客户端管理器"""
|
||
|
||
def __init__(self, initializer: SUWoodInitializer):
|
||
self.initializer = initializer
|
||
self.command_handler = SUWoodCommandHandler(initializer)
|
||
self.client_thread = None
|
||
self.running = False
|
||
|
||
# 【新增】调试文件计数器
|
||
self.debug_file_counter = 1
|
||
|
||
def start_client(self):
|
||
"""启动客户端 - 修复版本,参考原始脚本"""
|
||
if self.running:
|
||
print("⚠️ 客户端已在运行")
|
||
return True
|
||
|
||
print("🌐 启动SUWood客户端...")
|
||
|
||
# 【修复】不在主线程测试连接,直接启动后台线程
|
||
self.running = True
|
||
self.client_thread = threading.Thread(
|
||
target=self._client_loop_with_connection_test, # 新方法
|
||
daemon=True,
|
||
name="SUWoodClient"
|
||
)
|
||
self.client_thread.start()
|
||
|
||
print("🎯 客户端线程已启动,开始监听命令...")
|
||
return True
|
||
|
||
def stop_client(self):
|
||
"""停止客户端"""
|
||
if not self.running:
|
||
return
|
||
|
||
print("⛔ 停止SUWood客户端...")
|
||
self.running = False
|
||
|
||
if self.client_thread and self.client_thread.is_alive():
|
||
self.client_thread.join(timeout=5)
|
||
|
||
print("🔒 客户端已停止")
|
||
|
||
def _client_loop_with_connection_test(self):
|
||
"""客户端主循环 - 包含连接测试,参考原始脚本"""
|
||
print("🔄 进入客户端监听循环...")
|
||
|
||
consecutive_errors = 0
|
||
max_consecutive_errors = 10
|
||
|
||
try:
|
||
# 【关键】在线程中测试连接,不阻塞主线程
|
||
client = self.initializer.get_client()
|
||
if not client or not client.sock:
|
||
print("❌ 无法连接到SUWood服务器")
|
||
print("🔍 请检查:")
|
||
print(" 1. 服务器是否已启动")
|
||
print(" 2. 端口7999是否可用")
|
||
print(" 3. 防火墙设置")
|
||
return
|
||
|
||
print("✅ 已连接到SUWood服务器")
|
||
print("🎯 开始监听命令...")
|
||
|
||
# 修改后的代码,添加调试保存功能
|
||
while self.running:
|
||
try:
|
||
# 获取命令
|
||
commands = self.initializer.get_cmds()
|
||
|
||
# 【新增】调试功能:保存命令数据到桌面
|
||
if commands and len(commands) > 0:
|
||
self._save_commands_to_desktop(commands)
|
||
|
||
if commands and len(commands) > 0:
|
||
print(f"\n📥 收到 {len(commands)} 个命令")
|
||
consecutive_errors = 0 # 重置错误计数
|
||
|
||
# 处理每个命令
|
||
# for cmd in commands:
|
||
# if not self.running:
|
||
# break
|
||
|
||
# self._process_single_command(cmd)
|
||
|
||
# 短暂休眠避免过度占用CPU
|
||
time.sleep(0.1)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 收到中断信号,退出客户端循环")
|
||
break
|
||
|
||
except Exception as e:
|
||
consecutive_errors += 1
|
||
print(
|
||
f"❌ 客户端循环异常 ({consecutive_errors}/{max_consecutive_errors}): {e}")
|
||
|
||
if consecutive_errors >= max_consecutive_errors:
|
||
print(f"💀 连续错误过多,退出客户端循环")
|
||
break
|
||
|
||
# 错误后稍长休眠
|
||
time.sleep(1)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 客户端线程异常: {e}")
|
||
|
||
print("🏁 客户端循环结束")
|
||
|
||
def _process_single_command(self, cmd: Dict[str, Any]):
|
||
"""处理单个命令"""
|
||
cmd_type = None
|
||
cmd_data = {}
|
||
|
||
# 解析不同的命令格式
|
||
if 'type' in cmd:
|
||
cmd_type = cmd.get('type')
|
||
cmd_data = cmd.get('data', {})
|
||
elif 'cmd' in cmd:
|
||
cmd_type = cmd.get('cmd')
|
||
cmd_data = cmd.get('data', {}) if 'data' in cmd else {
|
||
k: v for k, v in cmd.items() if k != 'cmd'}
|
||
elif isinstance(cmd, dict):
|
||
# 尝试其他可能的键名
|
||
for key in ['command', 'action', 'method']:
|
||
if key in cmd:
|
||
cmd_type = cmd.get(key)
|
||
cmd_data = {k: v for k, v in cmd.items() if k != key}
|
||
break
|
||
|
||
if cmd_type:
|
||
self.command_handler.handle_command(cmd_type, cmd_data)
|
||
else:
|
||
print(f"⚠️ 无法解析命令格式: {cmd}")
|
||
|
||
def get_client_stats(self) -> Dict[str, Any]:
|
||
"""获取客户端状态"""
|
||
return {
|
||
'running': self.running,
|
||
'thread_alive': self.client_thread.is_alive() if self.client_thread else False,
|
||
'command_stats': self.command_handler.get_stats(),
|
||
'architecture': '模块化' if self.initializer.use_modular else '原始'
|
||
}
|
||
|
||
def _save_commands_to_desktop(self, commands: List[Dict[str, Any]]):
|
||
"""保存命令数据到桌面调试文件 - 增强解析版本"""
|
||
try:
|
||
import os
|
||
import json
|
||
from datetime import datetime
|
||
|
||
# 获取桌面路径
|
||
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")
|
||
|
||
# 生成基础文件名和详细解析文件名
|
||
base_filename = f"接收文件{self.debug_file_counter:03d}"
|
||
raw_file_path = os.path.join(desktop_path, f"{base_filename}_原始数据.txt")
|
||
parsed_file_path = os.path.join(
|
||
desktop_path, f"{base_filename}_解析数据.txt")
|
||
|
||
# ========== 保存原始数据文件 ==========
|
||
self._save_raw_data_file(commands, raw_file_path, base_filename)
|
||
|
||
# ========== 保存详细解析文件 ==========
|
||
self._save_parsed_data_file(commands, parsed_file_path, base_filename)
|
||
|
||
print(f"💾 调试数据已保存:")
|
||
print(f" 📄 原始数据: {raw_file_path}")
|
||
print(f" 📋 解析数据: {parsed_file_path}")
|
||
|
||
# 递增文件计数器
|
||
self.debug_file_counter += 1
|
||
|
||
except Exception as e:
|
||
print(f"❌ 保存调试文件失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
def _save_raw_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
|
||
"""保存原始JSON数据文件"""
|
||
from datetime import datetime
|
||
import json
|
||
|
||
content_lines = []
|
||
content_lines.append("=" * 80)
|
||
content_lines.append(f"SUWood命令原始数据文件 - {base_filename}")
|
||
content_lines.append(
|
||
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
|
||
content_lines.append(f"命令数量: {len(commands)}")
|
||
content_lines.append("=" * 80)
|
||
content_lines.append("")
|
||
|
||
for i, cmd in enumerate(commands, 1):
|
||
content_lines.append(f"【命令 {i}/{len(commands)}】")
|
||
content_lines.append("-" * 50)
|
||
json_str = json.dumps(cmd, ensure_ascii=False, indent=2)
|
||
content_lines.append(json_str)
|
||
content_lines.append("")
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(content_lines))
|
||
|
||
|
||
def _save_parsed_data_file(self, commands: List[Dict[str, Any]], file_path: str, base_filename: str):
|
||
"""保存详细解析数据文件"""
|
||
from datetime import datetime
|
||
|
||
content_lines = []
|
||
content_lines.append("=" * 100)
|
||
content_lines.append(f"SUWwood命令详细解析文件 - {base_filename}")
|
||
content_lines.append(
|
||
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
|
||
content_lines.append(f"命令数量: {len(commands)}")
|
||
content_lines.append("=" * 100)
|
||
content_lines.append("")
|
||
|
||
# 添加概览统计
|
||
content_lines.extend(self._generate_overview_stats(commands))
|
||
content_lines.append("")
|
||
|
||
# 详细解析每个命令
|
||
for i, cmd in enumerate(commands, 1):
|
||
content_lines.extend(self._parse_single_command(cmd, i, len(commands)))
|
||
content_lines.append("")
|
||
|
||
# 添加总结分析
|
||
content_lines.extend(self._generate_summary_analysis(commands))
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(content_lines))
|
||
|
||
|
||
def _generate_overview_stats(self, commands: List[Dict[str, Any]]) -> List[str]:
|
||
"""生成概览统计信息"""
|
||
lines = []
|
||
lines.append("📊 【概览统计】")
|
||
lines.append("=" * 60)
|
||
|
||
# 统计命令类型
|
||
cmd_types = {}
|
||
for cmd in commands:
|
||
cmd_type = self._extract_command_type(cmd)
|
||
cmd_types[cmd_type] = cmd_types.get(cmd_type, 0) + 1
|
||
|
||
lines.append(f"📋 命令类型分布 (共 {len(commands)} 个命令):")
|
||
for cmd_type, count in sorted(cmd_types.items()):
|
||
percentage = (count / len(commands)) * 100
|
||
lines.append(f" • {cmd_type}: {count}次 ({percentage:.1f}%)")
|
||
|
||
# 统计板材数量
|
||
total_boards = 0
|
||
for cmd in commands:
|
||
board_count = self._extract_board_count(cmd)
|
||
total_boards += board_count
|
||
|
||
lines.append(f"🏗️ 总板材数量: {total_boards}")
|
||
lines.append(f"📈 平均每命令板材数: {total_boards/len(commands):.1f}")
|
||
|
||
return lines
|
||
|
||
|
||
def _parse_single_command(self, cmd: Dict[str, Any], index: int, total: int) -> List[str]:
|
||
"""详细解析单个命令"""
|
||
lines = []
|
||
lines.append(f"🔍 【命令详细解析 {index}/{total}】")
|
||
lines.append("-" * 80)
|
||
|
||
# 基本信息
|
||
cmd_type = self._extract_command_type(cmd)
|
||
lines.append(f"📝 命令类型: {cmd_type}")
|
||
|
||
# 提取数据部分
|
||
data_part = cmd.get('data', cmd)
|
||
if isinstance(data_part, dict):
|
||
lines.append("📋 基本参数:")
|
||
|
||
# 解析基本参数
|
||
basic_params = ['cmd', 'cp', 'uid', 'pid', 'zid', 'layer', 'drw']
|
||
for param in basic_params:
|
||
if param in data_part:
|
||
value = data_part[param]
|
||
description = self._get_param_description(param, value)
|
||
lines.append(f" • {param}: {value} {description}")
|
||
|
||
# 解析板材信息
|
||
if 'finals' in data_part:
|
||
lines.extend(self._parse_board_details(data_part['finals']))
|
||
|
||
# 解析其他特殊字段
|
||
other_fields = {k: v for k, v in data_part.items()
|
||
if k not in basic_params + ['finals']}
|
||
if other_fields:
|
||
lines.append("🔧 其他参数:")
|
||
for key, value in other_fields.items():
|
||
lines.append(f" • {key}: {value}")
|
||
|
||
return lines
|
||
|
||
|
||
def _parse_board_details(self, finals: Any) -> List[str]:
|
||
"""解析板材详细信息"""
|
||
lines = []
|
||
|
||
if not isinstance(finals, list):
|
||
lines.append(f"⚠️ 板材数据格式异常: {type(finals)}")
|
||
return lines
|
||
|
||
lines.append(f"🏗️ 板材信息 (共 {len(finals)} 块):")
|
||
|
||
for i, board in enumerate(finals[:5]): # 显示前5块板材
|
||
lines.append(f" 📦 板材 {i+1}:")
|
||
|
||
if isinstance(board, dict):
|
||
# 板材基本信息
|
||
board_info = []
|
||
if 'ckey' in board:
|
||
board_info.append(f"材质: {board['ckey']}")
|
||
if 'typ' in board:
|
||
board_info.append(f"类型: {board['typ']}")
|
||
if 'pos' in board:
|
||
board_info.append(f"位置: {board['pos']}")
|
||
if 'antiz' in board:
|
||
board_info.append(f"防撞: {board['antiz']}")
|
||
|
||
if board_info:
|
||
lines.append(f" {' | '.join(board_info)}")
|
||
|
||
# 解析几何信息
|
||
if 'obv' in board:
|
||
obv_info = self._parse_geometry_info(board['obv'], "正面")
|
||
lines.append(f" {obv_info}")
|
||
|
||
if 'rev' in board:
|
||
rev_info = self._parse_geometry_info(board['rev'], "背面")
|
||
lines.append(f" {rev_info}")
|
||
|
||
else:
|
||
lines.append(f" ⚠️ 格式异常: {type(board)}")
|
||
|
||
if len(finals) > 5:
|
||
lines.append(f" ... 还有 {len(finals) - 5} 块板材未显示")
|
||
|
||
return lines
|
||
|
||
|
||
def _parse_geometry_info(self, geo_data: Dict[str, Any], face_name: str) -> str:
|
||
"""解析几何信息"""
|
||
if not isinstance(geo_data, dict):
|
||
return f"{face_name}: 格式异常"
|
||
|
||
info_parts = [f"{face_name}几何"]
|
||
|
||
if 'segs' in geo_data:
|
||
segs = geo_data['segs']
|
||
if isinstance(segs, list):
|
||
info_parts.append(f"轮廓点数: {len(segs)}")
|
||
|
||
# 计算尺寸
|
||
try:
|
||
coords = []
|
||
for seg in segs:
|
||
if isinstance(seg, list) and len(seg) > 0:
|
||
coord_str = seg[0].strip('()')
|
||
x, y, z = map(float, coord_str.split(','))
|
||
coords.append((x, y, z))
|
||
|
||
if coords:
|
||
min_x = min(c[0] for c in coords)
|
||
max_x = max(c[0] for c in coords)
|
||
min_y = min(c[1] for c in coords)
|
||
max_y = max(c[1] for c in coords)
|
||
min_z = min(c[2] for c in coords)
|
||
max_z = max(c[2] for c in coords)
|
||
|
||
width = max_x - min_x
|
||
depth = max_y - min_y
|
||
height = max_z - min_z
|
||
|
||
info_parts.append(
|
||
f"尺寸: {width:.1f}×{depth:.1f}×{height:.1f}mm")
|
||
except:
|
||
info_parts.append("尺寸: 解析失败")
|
||
|
||
if 'vx' in geo_data:
|
||
info_parts.append(f"X轴: {geo_data['vx']}")
|
||
if 'vz' in geo_data:
|
||
info_parts.append(f"Z轴: {geo_data['vz']}")
|
||
|
||
return " | ".join(info_parts)
|
||
|
||
|
||
def _generate_summary_analysis(self, commands: List[Dict[str, Any]]) -> List[str]:
|
||
"""生成总结分析"""
|
||
lines = []
|
||
lines.append("📈 【总结分析】")
|
||
lines.append("=" * 60)
|
||
|
||
# 命令时序分析
|
||
lines.append("⏱️ 命令时序特征:")
|
||
if len(commands) > 1:
|
||
lines.append(f" • 连续命令数量: {len(commands)}")
|
||
lines.append(f" • 可能的批量操作: {'是' if len(commands) > 3 else '否'}")
|
||
|
||
# 潜在风险分析
|
||
lines.append("⚠️ 潜在风险评估:")
|
||
total_boards = sum(self._extract_board_count(cmd) for cmd in commands)
|
||
|
||
if total_boards > 10:
|
||
lines.append(f" • 高风险: 大量板材创建 ({total_boards}块)")
|
||
lines.append(" • 建议: 监控内存使用和依赖图状态")
|
||
elif total_boards > 5:
|
||
lines.append(f" • 中等风险: 中量板材创建 ({total_boards}块)")
|
||
else:
|
||
lines.append(f" • 低风险: 少量板材创建 ({total_boards}块)")
|
||
|
||
# 操作建议
|
||
lines.append("💡 操作建议:")
|
||
if any(self._extract_command_type(cmd) == 'c04' for cmd in commands):
|
||
lines.append(" • 监控Blender内存使用情况")
|
||
lines.append(" • 注意依赖图操作重复警告")
|
||
lines.append(" • 如出现崩溃,检查是否超过1152对象限制")
|
||
|
||
from datetime import datetime
|
||
lines.append("")
|
||
lines.append(f"📅 分析生成时间: {datetime.now().isoformat()}")
|
||
lines.append("=" * 60)
|
||
|
||
return lines
|
||
|
||
|
||
def _extract_command_type(self, cmd: Dict[str, Any]) -> str:
|
||
"""提取命令类型"""
|
||
return cmd.get('cmd', cmd.get('type', cmd.get('data', {}).get('cmd', '未知')))
|
||
|
||
|
||
def _extract_board_count(self, cmd: Dict[str, Any]) -> int:
|
||
"""提取板材数量"""
|
||
data_part = cmd.get('data', cmd)
|
||
if isinstance(data_part, dict) and 'finals' in data_part:
|
||
finals = data_part['finals']
|
||
if isinstance(finals, list):
|
||
return len(finals)
|
||
return 0
|
||
|
||
|
||
def _get_param_description(self, param: str, value: Any) -> str:
|
||
"""获取参数描述"""
|
||
descriptions = {
|
||
'cmd': '(命令类型)',
|
||
'cp': '(组件索引)',
|
||
'uid': '(唯一标识)',
|
||
'pid': '(父级ID)',
|
||
'zid': '(区域ID)',
|
||
'layer': '(图层)',
|
||
'drw': '(绘制标志)',
|
||
}
|
||
return descriptions.get(param, '')
|
||
|
||
|
||
# ==================== 主启动函数 ====================
|
||
|
||
def start_suwood_blender_client():
|
||
"""启动SUWood Blender客户端 - 完全非阻塞版本"""
|
||
print("🎬 开始启动SUWood Blender客户端...")
|
||
|
||
# 【修复】将整个初始化过程移到后台线程
|
||
def background_init_and_start():
|
||
try:
|
||
# 1. 在后台线程中初始化SUWood系统
|
||
initializer = SUWoodInitializer()
|
||
if not initializer.initialize():
|
||
print("💀 初始化失败,无法启动客户端")
|
||
return
|
||
|
||
# 2. 创建客户端管理器
|
||
client_manager = SUWoodClientManager(initializer)
|
||
|
||
# 3. 启动客户端(已经在后台线程中)
|
||
if not client_manager.start_client():
|
||
print("💀 客户端启动失败")
|
||
return
|
||
|
||
print("🎉 SUWood Blender客户端启动成功!")
|
||
print("📋 系统信息:")
|
||
print(f" 🏗️ 架构: {'模块化' if initializer.use_modular else '原始'}")
|
||
print(f" 🔗 服务器: 已连接")
|
||
print(f" 🧵 客户端线程: 运行中")
|
||
print("\n💡 现在可以从SUWood服务器发送命令来在Blender中绘制模型了!")
|
||
|
||
# 设为全局变量
|
||
globals()['suwood_client'] = client_manager
|
||
|
||
except Exception as e:
|
||
print(f"❌ 后台初始化失败: {e}")
|
||
|
||
# 启动后台初始化线程
|
||
init_thread = threading.Thread(
|
||
target=background_init_and_start, daemon=True)
|
||
init_thread.start()
|
||
|
||
print("🚀 SUWood客户端正在后台初始化...")
|
||
print("💡 Blender界面保持响应,请等待初始化完成")
|
||
|
||
# 立即返回,不阻塞主线程
|
||
return None # 客户端管理器将在后台设置为全局变量
|
||
|
||
|
||
# ==================== 辅助函数 ====================
|
||
|
||
def print_system_status(client_manager: Optional[SUWoodClientManager] = None):
|
||
"""打印系统状态"""
|
||
print("\n" + "="*50)
|
||
print("📊 SUWood Blender客户端状态")
|
||
print("="*50)
|
||
|
||
if client_manager:
|
||
stats = client_manager.get_client_stats()
|
||
print(f"🔄 运行状态: {'✅ 运行中' if stats['running'] else '❌ 已停止'}")
|
||
print(f"🧵 线程状态: {'✅ 活跃' if stats['thread_alive'] else '❌ 停止'}")
|
||
print(f"🏗️ 架构模式: {stats['architecture']}")
|
||
|
||
cmd_stats = stats['command_stats']
|
||
print(f"📈 命令统计:")
|
||
print(f" 总计: {cmd_stats['total']}")
|
||
print(f" 成功: {cmd_stats['success']}")
|
||
print(f" 失败: {cmd_stats['failed']}")
|
||
|
||
if cmd_stats['total'] > 0:
|
||
success_rate = cmd_stats['success'] / cmd_stats['total'] * 100
|
||
print(f" 成功率: {success_rate:.1f}%")
|
||
|
||
if cmd_stats['last_command_time']:
|
||
print(f" 最后命令: {cmd_stats['last_command_time']}")
|
||
else:
|
||
print("❌ 客户端未启动")
|
||
|
||
print("="*50)
|
||
|
||
|
||
def show_usage_guide():
|
||
"""显示使用指南"""
|
||
print("\n" + "="*50)
|
||
print("📖 SUWood Blender客户端使用指南")
|
||
print("="*50)
|
||
print("1️⃣ 启动客户端:")
|
||
print(" client_manager = start_suwood_blender_client()")
|
||
print("")
|
||
print("2️⃣ 查看状态:")
|
||
print(" print_system_status(client_manager)")
|
||
print("")
|
||
print("3️⃣ 停止客户端:")
|
||
print(" client_manager.stop_client()")
|
||
print("")
|
||
print("4️⃣ 支持的命令类型:")
|
||
print(" - c04: 创建部件")
|
||
print(" - c05: 创建加工")
|
||
print(" - c08: 创建五金")
|
||
print(" - c09: 删除实体")
|
||
print(" - c0a: 删除加工")
|
||
print(" - c07: 创建尺寸标注")
|
||
print(" - 以及更多...")
|
||
print("")
|
||
print("💡 提示: 命令将从SUWood服务器自动接收并在Blender中执行")
|
||
print("="*50)
|
||
|
||
|
||
# ==================== 自动启动 ====================
|
||
|
||
if __name__ == "__main__":
|
||
# 显示使用指南
|
||
show_usage_guide()
|
||
|
||
# 自动启动客户端
|
||
print("\n🚀 自动启动客户端...")
|
||
client_manager = start_suwood_blender_client()
|
||
|
||
if client_manager:
|
||
try:
|
||
print("\n🔄 客户端运行中...")
|
||
print("💡 按 Ctrl+C 停止客户端")
|
||
|
||
# 定期显示状态
|
||
while True:
|
||
time.sleep(30) # 每30秒显示一次状态
|
||
print_system_status(client_manager)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 收到停止信号...")
|
||
client_manager.stop_client()
|
||
print("👋 再见!")
|