#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SUW Client - Python翻译版本 原文件: SUWClient.rb 用途: TCP客户端,与服务器通信 """ import socket import json import struct import threading import time from typing import List, Dict, Any, Optional # 常量定义 TCP_SERVER_PORT = 7999 OP_CMD_REQ_GETCMDS = 0x01 OP_CMD_REQ_SETCMD = 0x03 OP_CMD_RES_GETCMDS = 0x02 OP_CMD_RES_SETCMD = 0x04 class SUWClient: """SUWood 客户端类""" def __init__(self, host="127.0.0.1", port=TCP_SERVER_PORT): self.host = host self.port = port self.sock = None self.seqno = 0 self.connect() def connect(self): """连接到服务器""" try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) print(f"✅ 连接到服务器 {self.host}:{self.port}") except Exception as e: print(f"❌ 连接失败: {e}") self.sock = None def reconnect(self): """重新连接""" if self.sock: try: self.sock.close() except: pass self.connect() def send_msg(self, cmd: int, msg: str): """发送消息""" if not self.sock: print("❌ 未连接到服务器") return False try: opcode = (cmd & 0xffff) | 0x01010000 self.seqno += 1 # 打包消息:[消息长度, 操作码, 序列号, 保留字段] msg_bytes = msg.encode('utf-8') header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0) full_msg = header + msg_bytes self.sock.send(full_msg) return True except Exception as e: print(f"❌ 发送消息失败: {e}") return False def recv_msg(self) -> Optional[str]: """接收消息""" if not self.sock: print("❌ 未连接到服务器") return None try: # 接收头部(16字节) header = self.sock.recv(16) if len(header) < 16: return None # 解包获取消息长度 msg_len = struct.unpack('iiii', header)[0] # 接收消息内容 msg = b"" to_recv_len = msg_len while to_recv_len > 0: chunk = self.sock.recv(to_recv_len) if not chunk: break msg += chunk to_recv_len = msg_len - len(msg) return msg.decode('utf-8') except Exception as e: print(f"❌ 接收消息失败: {e}") return None # 全局客户端实例 _client_instance = None def get_client(): """获取客户端实例""" global _client_instance if _client_instance is None: _client_instance = SUWClient() return _client_instance def get_cmds() -> List[Dict[str, Any]]: """获取命令列表""" msg = json.dumps({ "cmd": "get_cmds", "params": {"from": "su"} }) client = get_client() cmds = [] try: if client.send_msg(OP_CMD_REQ_GETCMDS, msg): res = client.recv_msg() if res: res_hash = json.loads(res) if res_hash.get('ret') == 1: cmds = res_hash.get('data', {}).get('cmds', []) except Exception as e: print("========= get_cmds err is: =========") print(e) print("========= get_cmds res is: =========") print(res if 'res' in locals() else "No response") client.reconnect() return cmds def set_cmd(cmd: str, params: Dict[str, Any]): """设置命令""" cmds = { "cmd": "set_cmd", "params": params.copy() } cmds["params"]["from"] = "su" cmds["params"]["cmd"] = cmd msg = json.dumps(cmds) client = get_client() try: if client.send_msg(OP_CMD_REQ_SETCMD, msg): client.recv_msg() # 接收响应但不处理 except Exception as e: print(f"❌ set_cmd 错误: {e}") client.reconnect() class CommandProcessor: """命令处理器""" def __init__(self): self.cmds_queue = [] self.pause = 0 self.running = False self.timer_thread = None def start(self): """启动命令处理器""" if self.running: return self.running = True self.timer_thread = threading.Thread(target=self._timer_loop, daemon=True) self.timer_thread.start() print("✅ 命令处理器已启动") def stop(self): """停止命令处理器""" self.running = False if self.timer_thread: self.timer_thread.join(timeout=2) print("⛔ 命令处理器已停止") def _timer_loop(self): """定时器循环""" while self.running: try: if self.pause > 0: self.pause -= 1 else: self._process_commands() time.sleep(1) # 1秒间隔 except Exception as e: print(f"❌ 命令处理循环错误: {e}") time.sleep(1) def _process_commands(self): """处理命令""" try: # 获取新命令 swcmds0 = get_cmds() swcmds = self.cmds_queue + swcmds0 self.cmds_queue.clear() # 处理每个命令 for swcmd in swcmds: self._execute_command(swcmd) except Exception as e: print(f"❌ 处理命令时出错: {e}") def _execute_command(self, swcmd: Dict[str, Any]): """执行单个命令""" try: data = swcmd.get("data") if isinstance(data, str): # 直接执行字符串命令(注意安全性) print(f"执行字符串命令: {data}") # 在实际应用中,这里应该更安全地执行命令 elif isinstance(data, dict) and "cmd" in data: cmd = data.get("cmd") print(f"执行命令: {cmd}, 数据: {data}") if self.pause > 0: self.cmds_queue.append(swcmd) elif cmd.startswith("pause_"): self.pause = data.get("value", 1) else: pre_pause_time = data.get("pre_pause", 0) if pre_pause_time > 0: data_copy = data.copy() del data_copy["pre_pause"] swcmd_copy = swcmd.copy() swcmd_copy["data"] = data_copy self.pause = pre_pause_time self.cmds_queue.append(swcmd_copy) else: # 执行命令 self._call_suwood_method(cmd, data) after_pause_time = data.get("after_pause", 0) if after_pause_time > 0: self.pause = after_pause_time except Exception as e: print(f"❌ 执行命令时出错: {e}") def _call_suwood_method(self, cmd: str, data: Dict[str, Any]): """调用SUWood方法""" try: # 这里需要导入SUWImpl并调用相应方法 from .suw_impl import SUWImpl # 获取SUWImpl实例 impl_instance = SUWImpl.get_instance() # 调用方法 if hasattr(impl_instance, cmd): method = getattr(impl_instance, cmd) method(data) else: print(f"⚠️ 方法不存在: {cmd}") except ImportError: print("⚠️ SUWImpl 模块未找到") except Exception as e: print(f"❌ 调用SUWood方法时出错: {e}") # 全局命令处理器实例 _processor_instance = None def get_processor(): """获取命令处理器实例""" global _processor_instance if _processor_instance is None: _processor_instance = CommandProcessor() return _processor_instance def start_command_processor(): """启动命令处理器""" processor = get_processor() processor.start() def stop_command_processor(): """停止命令处理器""" processor = get_processor() processor.stop() # 自动启动命令处理器(可选) if __name__ == "__main__": print("🚀 SUW客户端测试") # 测试连接 client = get_client() if client.sock: print("连接成功,测试获取命令...") cmds = get_cmds() print(f"获取到 {len(cmds)} 个命令") # 启动命令处理器 start_command_processor() try: # 保持运行 while True: time.sleep(10) except KeyboardInterrupt: print("\n停止客户端...") stop_command_processor() else: print("连接失败")