blenderpython/suw_client.py

443 lines
14 KiB
Python
Raw Permalink Normal View History

2025-08-01 17:13:30 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Client - Python翻译版本
原文件: SUWClient.rb
用途: TCP客户端与服务器通信
"""
import socket
import json
import struct
import threading
import time
from typing import List, Dict, Any, Optional
# 常量定义
TCP_SERVER_PORT = 7999
OP_CMD_REQ_GETCMDS = 0x01
OP_CMD_REQ_SETCMD = 0x03
OP_CMD_RES_GETCMDS = 0x02
OP_CMD_RES_SETCMD = 0x04
class SUWClient:
"""SUWood 客户端类"""
def __init__(self, host="127.0.0.1", port=TCP_SERVER_PORT):
self.host = host
self.port = port
self.sock = None
self.seqno = 0
self.connect()
def connect(self):
"""连接到服务器"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
print(f"✅ 连接到服务器 {self.host}:{self.port}")
except Exception as e:
print(f"❌ 连接失败: {e}")
self.sock = None
def reconnect(self):
"""重新连接"""
if self.sock:
try:
self.sock.close()
except:
pass
self.connect()
def send_msg(self, cmd: int, msg: str):
"""发送消息"""
if not self.sock:
print("❌ 未连接到服务器")
return False
try:
opcode = (cmd & 0xffff) | 0x01010000
self.seqno += 1
# 打包消息:[消息长度, 操作码, 序列号, 保留字段]
msg_bytes = msg.encode('utf-8')
header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0)
full_msg = header + msg_bytes
self.sock.send(full_msg)
return True
except Exception as e:
print(f"❌ 发送消息失败: {e}")
return False
def recv_msg(self) -> Optional[str]:
"""接收消息 - 修复编码问题"""
if not self.sock:
print("❌ 未连接到服务器")
return None
try:
# 接收头部16字节
header = self.sock.recv(16)
if len(header) < 16:
return None
# 解包获取消息长度
msg_len = struct.unpack('iiii', header)[0]
# 接收消息内容
msg = b""
to_recv_len = msg_len
while to_recv_len > 0:
chunk = self.sock.recv(to_recv_len)
if not chunk:
break
msg += chunk
to_recv_len = msg_len - len(msg)
# 【修复】改进编码处理
if not msg:
return None
# 首先尝试UTF-8
try:
return msg.decode('utf-8')
except UnicodeDecodeError:
# 尝试其他编码
encodings = ['latin1', 'gbk', 'gb2312', 'cp1252', 'iso-8859-1']
for encoding in encodings:
try:
decoded = msg.decode(encoding)
if encoding != 'utf-8':
print(f"⚠️ 使用 {encoding} 编码解码成功")
return decoded
except UnicodeDecodeError:
continue
# 如果所有编码都失败,使用错误处理模式
print("⚠️ 所有编码都失败,使用错误处理模式")
return msg.decode('utf-8', errors='ignore')
except Exception as e:
print(f"❌ 接收消息失败: {e}")
return None
# 全局客户端实例
_client_instance = None
def get_client():
"""获取客户端实例"""
global _client_instance
if _client_instance is None:
_client_instance = SUWClient()
return _client_instance
def get_cmds() -> List[Dict[str, Any]]:
"""获取命令列表 - 修复错误处理"""
msg = json.dumps({
"cmd": "get_cmds",
"params": {"from": "su"}
})
client = get_client()
cmds = []
try:
if client.send_msg(OP_CMD_REQ_GETCMDS, msg):
res = client.recv_msg()
if res:
try:
# 尝试清理响应数据,移除可能的截断部分
cleaned_res = res.strip()
# 如果响应以 } 结尾尝试找到完整的JSON
if cleaned_res.endswith('}'):
# 尝试从后往前找到匹配的 { 和 }
brace_count = 0
end_pos = len(cleaned_res) - 1
for i in range(end_pos, -1, -1):
if cleaned_res[i] == '}':
brace_count += 1
elif cleaned_res[i] == '{':
brace_count -= 1
if brace_count == 0:
# 找到完整的JSON
cleaned_res = cleaned_res[:i+1]
break
res_hash = json.loads(cleaned_res)
if res_hash.get('ret') == 1:
cmds = res_hash.get('data', {}).get('cmds', [])
# 只在有命令时输出日志
if cmds:
print(f"✅ 成功获取 {len(cmds)} 个命令")
else:
print(f"⚠️ 服务器返回错误: {res_hash.get('msg', '未知错误')}")
except json.JSONDecodeError as e:
# 只在调试模式下打印详细错误信息
if len(res) > 200: # 只对长响应打印详细信息
print(f"⚠️ JSON解析失败: {e}")
print(f"原始响应: {res[:200]}...")
# 尝试更激进的清理
try:
# 查找可能的JSON开始位置
start_pos = res.find('{')
if start_pos != -1:
# 尝试找到匹配的结束位置
brace_count = 0
for i in range(start_pos, len(res)):
if res[i] == '{':
brace_count += 1
elif res[i] == '}':
brace_count -= 1
if brace_count == 0:
cleaned_res = res[start_pos:i+1]
res_hash = json.loads(cleaned_res)
if res_hash.get('ret') == 1:
cmds = res_hash.get(
'data', {}).get('cmds', [])
# 只在有命令时输出日志
if cmds:
print(
f"✅ 修复后成功获取 {len(cmds)} 个命令")
# 【关键修复】确保修复后的命令被返回
return cmds
break
except:
print("❌ 无法修复JSON格式")
return []
except Exception as e:
print("========= get_cmds err is: =========")
print(e)
print("========= get_cmds res is: =========")
if 'res' in locals():
print(f"响应内容: {res[:200]}...")
else:
print("No response")
# 尝试重新连接
try:
client.reconnect()
except:
pass
return cmds
def set_cmd(cmd: str, params: Dict[str, Any]):
"""设置命令"""
cmds = {
"cmd": "set_cmd",
"params": params.copy()
}
cmds["params"]["from"] = "su"
cmds["params"]["cmd"] = cmd
msg = json.dumps(cmds)
client = get_client()
try:
if client.send_msg(OP_CMD_REQ_SETCMD, msg):
client.recv_msg() # 接收响应但不处理
except Exception as e:
print(f"❌ set_cmd 错误: {e}")
client.reconnect()
class CommandProcessor:
"""命令处理器"""
def __init__(self):
self.cmds_queue = []
self.pause = 0
self.running = False
self.timer_thread = None
def start(self):
"""启动命令处理器"""
if self.running:
return
self.running = True
self.timer_thread = threading.Thread(
target=self._timer_loop, daemon=True)
self.timer_thread.start()
print("✅ 命令处理器已启动")
def stop(self):
"""停止命令处理器"""
self.running = False
if self.timer_thread:
self.timer_thread.join(timeout=2)
print("⛔ 命令处理器已停止")
def _timer_loop(self):
"""定时器循环"""
while self.running:
try:
if self.pause > 0:
self.pause -= 1
else:
self._process_commands()
time.sleep(1) # 1秒间隔
except Exception as e:
print(f"❌ 命令处理循环错误: {e}")
time.sleep(1)
def _process_commands(self):
"""处理命令"""
try:
# 获取新命令
swcmds0 = get_cmds()
swcmds = self.cmds_queue + swcmds0
self.cmds_queue.clear()
# 处理每个命令
for swcmd in swcmds:
self._execute_command(swcmd)
except Exception as e:
print(f"❌ 处理命令时出错: {e}")
def _execute_command(self, swcmd: Dict[str, Any]):
"""执行单个命令"""
try:
data = swcmd.get("data")
if isinstance(data, str):
# 直接执行字符串命令(注意安全性)
print(f"执行字符串命令: {data}")
# 在实际应用中,这里应该更安全地执行命令
elif isinstance(data, dict) and "cmd" in data:
cmd = data.get("cmd")
print(f"执行命令: {cmd}, 数据: {data}")
if self.pause > 0:
self.cmds_queue.append(swcmd)
elif cmd.startswith("pause_"):
self.pause = data.get("value", 1)
else:
pre_pause_time = data.get("pre_pause", 0)
if pre_pause_time > 0:
data_copy = data.copy()
del data_copy["pre_pause"]
swcmd_copy = swcmd.copy()
swcmd_copy["data"] = data_copy
self.pause = pre_pause_time
self.cmds_queue.append(swcmd_copy)
else:
# 执行命令
self._call_suwood_method(cmd, data)
after_pause_time = data.get("after_pause", 0)
if after_pause_time > 0:
self.pause = after_pause_time
except Exception as e:
print(f"❌ 执行命令时出错: {e}")
def _call_suwood_method(self, cmd: str, data: Dict[str, Any]):
"""调用SUWood方法"""
try:
# 这里需要导入suw_core并调用相应方法
from .suw_core import get_selection_manager, get_machining_manager, get_deletion_manager
# 根据命令类型调用相应的管理器
if cmd.startswith("sel_"):
# 选择相关命令
selection_manager = get_selection_manager()
if hasattr(selection_manager, cmd):
method = getattr(selection_manager, cmd)
method(data)
else:
print(f"⚠️ 选择管理器方法不存在: {cmd}")
elif cmd.startswith("mach_"):
# 加工相关命令
machining_manager = get_machining_manager()
if hasattr(machining_manager, cmd):
method = getattr(machining_manager, cmd)
method(data)
else:
print(f"⚠️ 加工管理器方法不存在: {cmd}")
elif cmd.startswith("del_"):
# 删除相关命令
deletion_manager = get_deletion_manager()
if hasattr(deletion_manager, cmd):
method = getattr(deletion_manager, cmd)
method(data)
else:
print(f"⚠️ 删除管理器方法不存在: {cmd}")
else:
print(f"⚠️ 未知命令类型: {cmd}")
except ImportError:
print("⚠️ suw_core 模块未找到")
except Exception as e:
print(f"❌ 调用SUWood方法时出错: {e}")
# 全局命令处理器实例
_processor_instance = None
def get_processor():
"""获取命令处理器实例"""
global _processor_instance
if _processor_instance is None:
_processor_instance = CommandProcessor()
return _processor_instance
def start_command_processor():
"""启动命令处理器"""
processor = get_processor()
processor.start()
def stop_command_processor():
"""停止命令处理器"""
processor = get_processor()
processor.stop()
# 自动启动命令处理器(可选)
if __name__ == "__main__":
print("🚀 SUW客户端测试")
# 测试连接
client = get_client()
if client.sock:
print("连接成功,测试获取命令...")
cmds = get_cmds()
print(f"获取到 {len(cmds)} 个命令")
# 启动命令处理器
start_command_processor()
try:
# 保持运行
while True:
time.sleep(10)
except KeyboardInterrupt:
print("\n停止客户端...")
stop_command_processor()
else:
print("连接失败")