blenderpython/test/blender_suw_core_independen...

463 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Blender SUWood Core 独立客户端 - 命令执行修复版本
用途: 在Blender中使用独立的suw_core模块与SUWood服务器通信
版本: 3.9.0
作者: SUWood Team
功能特性:
1. 完全使用独立的suw_core模块化架构
2. 无需依赖原始的SUWImpl类
3. 启动suw_client与SUWood服务器通信
4. 使用命令分发器处理服务器命令
5. 在Blender中实时绘制3D模型
6. 支持所有SUWood操作(创建部件、加工、删除等)
7. 【修复】修复命令执行参数问题
"""
import sys
import os
import time
import threading
import datetime
import traceback
from typing import Dict, Any, Optional, List
import logging
# ==================== 路径配置 ====================
SUWOOD_PATH = r"D:\XL\code\blender\blenderpython"
print("🚀 Blender SUWood Core 独立客户端启动...")
print(f"📁 SUWood路径: {SUWOOD_PATH}")
# 添加路径到Python搜索路径
if SUWOOD_PATH not in sys.path:
sys.path.insert(0, SUWOOD_PATH)
# ==================== 独立SUWood客户端类 ====================
class IndependentSUWoodClient:
"""独立SUWood客户端 - 命令执行修复版本"""
def __init__(self):
"""初始化独立SUWood客户端"""
self.client = None
self.is_running = False
self.command_count = 0
self.success_count = 0
self.fail_count = 0
self.last_check_time = None
self.start_time = None
self.command_dispatcher = None
self.client_thread = None
def initialize_system(self):
"""初始化独立SUWood系统"""
try:
print("🔧 初始化独立SUWood系统...")
# 导入客户端模块
print("📡 导入客户端模块...")
from suw_client import SUWClient
# 创建客户端实例
print("🔗 创建客户端连接...")
self.client = SUWClient()
# 检查连接状态
if self.client.sock is None:
print("❌ 客户端连接失败")
return False
print("✅ 客户端连接成功")
# 测试连接
print("🔗 测试服务器连接...")
test_result = self._test_connection()
if test_result:
print("✅ 服务器连接正常")
# 初始化命令分发器
print("🔧 初始化命令分发器...")
if self._init_command_dispatcher():
print("✅ 命令分发器初始化完成")
return True
else:
print("❌ 命令分发器初始化失败")
return False
else:
print("❌ 服务器连接测试失败")
return False
except Exception as e:
print(f"❌ 独立SUWood初始化失败: {e}")
traceback.print_exc()
return False
def _init_command_dispatcher(self):
"""初始化命令分发器"""
try:
# 直接导入各个管理器模块
print("📦 导入管理器模块...")
# 导入数据管理器
from suw_core.data_manager import get_data_manager
data_manager = get_data_manager()
print("✅ DataManager 导入完成")
# 导入各个管理器
from suw_core.material_manager import MaterialManager
from suw_core.part_creator import PartCreator
from suw_core.machining_manager import MachiningManager
from suw_core.selection_manager import SelectionManager
from suw_core.deletion_manager import DeletionManager
from suw_core.hardware_manager import HardwareManager
from suw_core.door_drawer_manager import DoorDrawerManager
from suw_core.dimension_manager import DimensionManager
from suw_core.command_dispatcher import CommandDispatcher
print("✅ 所有管理器模块导入完成")
# 创建管理器实例
print("🔧 创建管理器实例...")
material_manager = MaterialManager()
part_creator = PartCreator()
machining_manager = MachiningManager()
selection_manager = SelectionManager()
deletion_manager = DeletionManager()
hardware_manager = HardwareManager()
door_drawer_manager = DoorDrawerManager()
dimension_manager = DimensionManager()
print("✅ 管理器实例创建完成")
# 创建命令分发器
self.command_dispatcher = CommandDispatcher()
print("✅ 命令分发器创建完成")
return True
except Exception as e:
print(f"❌ 命令分发器初始化失败: {e}")
traceback.print_exc()
return False
def _test_connection(self):
"""测试连接"""
try:
if not self.client or not self.client.sock:
return False
# 发送一个简单的测试消息
test_msg = '{"cmd": "test", "params": {"from": "su"}}'
if self.client.send_msg(0x01, test_msg):
print("✅ 测试消息发送成功")
return True
else:
print("❌ 测试消息发送失败")
return False
except Exception as e:
print(f"❌ 连接测试失败: {e}")
return False
def start_client(self):
"""启动客户端"""
try:
print("🌐 启动独立SUWood客户端...")
if not self.client:
print("❌ 客户端未初始化")
return False
self.is_running = True
self.start_time = datetime.datetime.now()
self.last_check_time = self.start_time
# 启动后台线程
print("🧵 启动客户端后台线程...")
self.client_thread = threading.Thread(
target=self._client_loop, daemon=True)
self.client_thread.start()
print("✅ 独立SUWood客户端启动成功!")
print("📋 系统信息:")
print(" 🏗️ 架构: 独立模块化架构")
print(" 🔗 服务器: 已连接")
print(" 🧵 客户端线程: 运行中")
print(" 🎯 命令执行: 已启用")
print("\n💡 使用说明:")
print(" 1. 客户端已连接到服务器")
print(" 2. 使用 check_commands() 手动检查新命令")
print(" 3. 使用 print_status() 查看状态")
print(" 4. 使用 stop_client() 停止客户端")
return True
except Exception as e:
print(f"❌ 客户端启动失败: {e}")
traceback.print_exc()
return False
def _client_loop(self):
"""客户端主循环 - 基于原始版本"""
print("🔄 进入客户端监听循环...")
consecutive_errors = 0
max_consecutive_errors = 10
try:
if not self.client or not self.client.sock:
print("❌ 无法连接到SUWood服务器")
return
print("✅ 已连接到SUWood服务器")
print("🎯 开始监听命令...")
while self.is_running:
try:
# 获取命令
from suw_client import get_cmds
commands = get_cmds()
if commands and len(commands) > 0:
print(f"\n 收到 {len(commands)} 个命令")
consecutive_errors = 0 # 重置错误计数
# 处理每个命令
for cmd in commands:
if not self.is_running:
break
self._process_command(cmd)
# 【关键】短暂休眠避免过度占用CPU - 这是原始版本的关键
time.sleep(0.1)
except KeyboardInterrupt:
print("\n🛑 收到中断信号,退出客户端循环")
break
except Exception as e:
consecutive_errors += 1
print(
f"❌ 客户端循环异常 ({consecutive_errors}/{max_consecutive_errors}): {e}")
if consecutive_errors >= max_consecutive_errors:
print(f"💀 连续错误过多,退出客户端循环")
break
# 错误后稍长休眠
time.sleep(1)
except Exception as e:
print(f"❌ 客户端线程异常: {e}")
print(" 客户端循环结束")
def check_commands(self):
"""手动检查命令 - 备用方法"""
try:
if not self.is_running or not self.client:
print("❌ 客户端未运行")
return
print(
f"\n 手动检查命令... (上次检查: {self.last_check_time.strftime('%H:%M:%S') if self.last_check_time else '从未'})")
# 使用get_cmds函数检查命令
from suw_client import get_cmds
cmds = get_cmds()
if cmds:
print(f" 收到 {len(cmds)} 个命令")
for cmd in cmds:
self._process_command(cmd)
else:
print("📭 暂无新命令")
self.last_check_time = datetime.datetime.now()
except Exception as e:
print(f"❌ 检查命令失败: {e}")
traceback.print_exc()
def _process_command(self, cmd_data):
"""处理命令 - 【修复】正确处理命令参数"""
from datetime import datetime
try:
self.command_count += 1
print(
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
print(f"🎯 处理命令 #{self.command_count}: {cmd_data}")
# 解析命令数据
command_type = None
command_data = {}
# 处理不同的命令格式
if isinstance(cmd_data, dict):
if 'cmd' in cmd_data and 'data' in cmd_data:
# 格式: {'cmd': 'set_cmd', 'data': {'cmd': 'c04', ...}}
command_type = cmd_data['data'].get('cmd')
command_data = cmd_data['data']
elif 'cmd' in cmd_data:
# 格式: {'cmd': 'c04', ...}
command_type = cmd_data['cmd']
command_data = cmd_data
else:
print(f"⚠️ 无法解析命令格式: {cmd_data}")
return
if command_type:
print(f"🔧 执行命令: {command_type}")
# 使用命令分发器执行命令 - 【修复】传递正确的参数
if self.command_dispatcher:
result = self.command_dispatcher.dispatch_command(
command_type, command_data)
if result:
print(f"✅ 命令 {command_type} 执行成功")
self.success_count += 1
print("")
else:
print(f"❌ 命令 {command_type} 执行失败")
self.fail_count += 1
print("")
else:
print("⚠️ 命令分发器未初始化,只记录命令")
print("")
self.success_count += 1
else:
print(f"⚠️ 无法识别命令类型: {cmd_data}")
self.fail_count += 1
print("")
except Exception as e:
print(f"❌ 命令处理失败: {e}")
self.fail_count += 1
print("")
traceback.print_exc()
def print_status(self):
"""打印状态"""
if not self.is_running:
print("❌ 客户端未运行")
return
runtime = datetime.datetime.now(
) - self.start_time if self.start_time else datetime.timedelta(0)
success_rate = (self.success_count / self.command_count *
100) if self.command_count > 0 else 0
thread_alive = self.client_thread.is_alive() if self.client_thread else False
print("\n==================================================")
print("📊 独立SUWood客户端状态")
print("==================================================")
print(f"🔄 运行状态: {'✅ 运行中' if self.is_running else '❌ 已停止'}")
print(f"🧵 线程状态: {'✅ 活跃' if thread_alive else '❌ 停止'}")
print(f"🏗️ 架构模式: 独立模块化架构")
print(f"⏱️ 运行时间: {runtime}")
print(f"📈 命令统计:")
print(f" 总计: {self.command_count}")
print(f" 成功: {self.success_count}")
print(f" 失败: {self.fail_count}")
print(f" 成功率: {success_rate:.1f}%")
print(
f"🔍 最后检查: {self.last_check_time.strftime('%H:%M:%S') if self.last_check_time else '从未'}")
print(f"🎯 命令分发器: {'✅ 已初始化' if self.command_dispatcher else '❌ 未初始化'}")
print("==================================================")
def stop_client(self):
"""停止客户端"""
try:
print("🛑 停止独立SUWood客户端...")
self.is_running = False
if self.client_thread and self.client_thread.is_alive():
self.client_thread.join(timeout=2)
if self.client:
self.client.disconnect()
print("✅ 客户端已停止")
except Exception as e:
print(f"❌ 停止客户端失败: {e}")
traceback.print_exc()
# ==================== 全局客户端实例 ====================
independent_suwood_client = IndependentSUWoodClient()
# ==================== 便捷函数 ====================
def start_independent_suwood_client():
"""启动独立SUWood客户端"""
print(" 自动启动独立客户端...")
print("开始启动独立SUWood客户端...")
if independent_suwood_client.initialize_system():
if independent_suwood_client.start_client():
print("\n🎉 独立SUWood客户端启动成功!")
print(" 现在可以从SUWood服务器发送命令来在Blender中绘制模型了!")
print("\n💡 客户端正在后台自动监听命令")
print("💡 也可以使用 check_commands() 手动检查新命令")
return True
else:
print("❌ 客户端启动失败")
return False
else:
print("❌ 系统初始化失败")
return False
def check_commands():
"""检查命令 - 手动调用"""
independent_suwood_client.check_commands()
def print_independent_system_status(client=None):
"""打印独立系统状态"""
if client is None:
client = independent_suwood_client
client.print_status()
def stop_independent_suwood_client():
"""停止独立SUWood客户端"""
independent_suwood_client.stop_client()
# ==================== 使用指南 ====================
print("\n==================================================")
print(" 独立SUWood客户端使用指南")
print("==================================================")
print("1⃣ 启动客户端:")
print(" start_independent_suwood_client()")
print("\n2⃣ 手动检查命令:")
print(" check_commands()")
print("\n3⃣ 查看状态:")
print(" print_independent_system_status(independent_suwood_client)")
print("\n4⃣ 停止客户端:")
print(" independent_suwood_client.stop_client()")
print("\n💡 特点: 完全独立的模块化架构,后台自动监听")
print("==================================================\n")
# ==================== 自动启动 ====================
print("🚀 自动启动独立客户端...")
start_independent_suwood_client()
print("\n💡 提示: 客户端正在后台自动监听命令")
print("💡 提示: 也可以使用 check_commands() 手动检查新命令")
print("💡 提示: 使用 print_independent_system_status() 查看状态")