#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SUWood 数据监听器 用于获取从其他程序启动的SUWood服务器发送给suw_client的数据 """ import socket import json import struct import threading import time import queue from datetime import datetime from typing import Dict, Any, Optional, List, Callable class SUWoodDataListener: """SUWood数据监听器 - 监听服务器发送的数据""" def __init__(self, host="127.0.0.1", port=7999): self.host = host self.port = port self.sock = None self.running = False self.listener_thread = None self.data_queue = queue.Queue() self.callbacks = [] # 数据接收回调函数列表 self.seqno = 0 # 数据统计 self.total_received = 0 self.last_receive_time = None def add_callback(self, callback: Callable[[Dict[str, Any]], None]): """添加数据接收回调函数""" self.callbacks.append(callback) print(f"✅ 已添加数据回调函数: {callback.__name__}") def remove_callback(self, callback: Callable[[Dict[str, Any]], None]): """移除数据接收回调函数""" if callback in self.callbacks: self.callbacks.remove(callback) print(f"❌ 已移除数据回调函数: {callback.__name__}") def connect(self) -> bool: """连接到SUWood服务器""" try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(10) # 设置连接超时 self.sock.connect((self.host, self.port)) print(f"✅ 成功连接到SUWood服务器 {self.host}:{self.port}") return True except Exception as e: print(f"❌ 连接SUWood服务器失败: {e}") self.sock = None return False def disconnect(self): """断开连接""" if self.sock: try: self.sock.close() except: pass self.sock = None print("🔌 已断开连接") def start_listening(self): """开始监听数据""" if self.running: print("⚠️ 监听器已在运行") return if not self.connect(): return self.running = True self.listener_thread = threading.Thread( target=self._listen_loop, daemon=True) self.listener_thread.start() print("🎧 开始监听SUWood服务器数据...") def stop_listening(self): """停止监听""" self.running = False self.disconnect() if self.listener_thread: self.listener_thread.join(timeout=3) print("⛔ 数据监听已停止") def _listen_loop(self): """监听循环""" while self.running and self.sock: try: # 定期发送心跳命令以保持连接并获取数据 self._send_heartbeat() # 接收服务器响应数据 data = self._receive_data() if data: self._process_received_data(data) time.sleep(1) # 1秒间隔 except Exception as e: print(f"❌ 监听过程中出错: {e}") if self.running: print("🔄 尝试重新连接...") self.disconnect() time.sleep(2) if not self.connect(): break def _send_heartbeat(self): """发送心跳命令获取数据""" try: # 发送获取命令列表的请求 msg = json.dumps({ "cmd": "get_cmds", "params": {"from": "listener"} }) # 基于测试结果,使用兼容协议 (0x01030001) # 因为我们确认了接收端使用0x01030002响应 self._send_message_compat(0x01, msg) except Exception as e: print(f"❌ 发送心跳失败: {e}") def _send_message(self, cmd: int, msg: str): """发送消息到服务器 - 标准协议""" if not self.sock: return False try: opcode = (cmd & 0xffff) | 0x01010000 # 标准协议: 0x01010001 self.seqno += 1 msg_bytes = msg.encode('utf-8') header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0) full_msg = header + msg_bytes self.sock.send(full_msg) return True except Exception as e: print(f"❌ 发送消息失败(标准协议): {e}") return False def _send_message_compat(self, cmd: int, msg: str): """发送消息到服务器 - 兼容协议""" if not self.sock: return False try: opcode = (cmd & 0xffff) | 0x01030000 # 兼容协议: 0x01030001 self.seqno += 1 msg_bytes = msg.encode('utf-8') header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0) full_msg = header + msg_bytes self.sock.send(full_msg) print(f"🔄 使用兼容协议发送消息: 0x{opcode:08x}") return True except Exception as e: print(f"❌ 发送消息失败(兼容协议): {e}") return False def _receive_data(self) -> Optional[Dict[str, Any]]: """接收服务器数据""" if not self.sock: return None try: # 设置非阻塞模式,避免永久等待 self.sock.settimeout(1.0) # 接收头部(16字节) header = self.sock.recv(16) if len(header) < 16: return None # 解包获取消息长度 msg_len, opcode, seqno, reserved = struct.unpack('iiii', header) # 接收消息内容 msg = b"" to_recv_len = msg_len while to_recv_len > 0: chunk = self.sock.recv(min(to_recv_len, 4096)) if not chunk: break msg += chunk to_recv_len = msg_len - len(msg) if len(msg) == msg_len: text_data = msg.decode('utf-8') parsed_data = json.loads(text_data) # 添加元数据 parsed_data['_meta'] = { 'opcode': opcode, 'seqno': seqno, 'reserved': reserved, 'receive_time': datetime.now().isoformat(), 'message_length': msg_len } return parsed_data except socket.timeout: # 超时是正常的,继续循环 pass except Exception as e: print(f"❌ 接收数据失败: {e}") return None def _process_received_data(self, data: Dict[str, Any]): """处理接收到的数据""" self.total_received += 1 self.last_receive_time = datetime.now() # 添加到队列 self.data_queue.put(data) # 调用回调函数 for callback in self.callbacks: try: callback(data) except Exception as e: print(f"❌ 回调函数 {callback.__name__} 执行失败: {e}") # 打印数据摘要 self._print_data_summary(data) def _print_data_summary(self, data: Dict[str, Any]): """打印数据摘要""" meta = data.get('_meta', {}) receive_time = meta.get('receive_time', 'unknown') print(f"📥 [{receive_time}] 收到数据:") print(f" 🔗 操作码: 0x{meta.get('opcode', 0):08x}") print(f" 📝 序列号: {meta.get('seqno', 0)}") print(f" 📊 数据大小: {meta.get('message_length', 0)} 字节") # 打印主要数据内容 if 'ret' in data: print(f" ✅ 返回状态: {data.get('ret')}") if 'data' in data: data_content = data.get('data', {}) if isinstance(data_content, dict): print(f" 📋 数据内容: {len(data_content)} 个字段") for key in list(data_content.keys())[:3]: # 显示前3个字段 print(f" • {key}: {type(data_content[key]).__name__}") print() def get_latest_data(self) -> Optional[Dict[str, Any]]: """获取最新的数据(非阻塞)""" try: return self.data_queue.get_nowait() except queue.Empty: return None def get_all_data(self) -> List[Dict[str, Any]]: """获取所有未处理的数据""" data_list = [] while True: try: data_list.append(self.data_queue.get_nowait()) except queue.Empty: break return data_list def wait_for_data(self, timeout: float = 10.0) -> Optional[Dict[str, Any]]: """等待接收数据(阻塞)""" try: return self.data_queue.get(timeout=timeout) except queue.Empty: return None def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" return { "total_received": self.total_received, "last_receive_time": self.last_receive_time.isoformat() if self.last_receive_time else None, "queue_size": self.data_queue.qsize(), "is_running": self.running, "is_connected": self.sock is not None, "callbacks_count": len(self.callbacks) } # 回调函数示例 def print_suwood_data(data: Dict[str, Any]): """打印SUWood数据的回调函数""" print("🎯 SUWood数据回调:") print(f" 数据: {json.dumps(data, ensure_ascii=False, indent=2)}") def save_suwood_data(data: Dict[str, Any]): """保存SUWood数据的回调函数""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"suwood_data_{timestamp}.json" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"💾 数据已保存到: {filename}") except Exception as e: print(f"❌ 保存数据失败: {e}") # 工具函数 def create_listener(host="127.0.0.1", port=7999) -> SUWoodDataListener: """创建SUWood数据监听器""" return SUWoodDataListener(host, port) def start_monitoring(host="127.0.0.1", port=7999, save_to_file=True, print_data=True): """开始监控SUWood服务器数据""" listener = create_listener(host, port) # 添加回调函数 if print_data: listener.add_callback(print_suwood_data) if save_to_file: listener.add_callback(save_suwood_data) # 开始监听 listener.start_listening() return listener if __name__ == "__main__": print("🎧 SUWood数据监听器") print("=" * 50) # 创建监听器 listener = start_monitoring() try: print("⌨️ 按 Ctrl+C 停止监听...") while True: time.sleep(1) stats = listener.get_statistics() if stats['total_received'] > 0: print( f"📊 已接收 {stats['total_received']} 条数据,队列中有 {stats['queue_size']} 条待处理") except KeyboardInterrupt: print("\n⛔ 停止监听...") listener.stop_listening() # 显示最终统计 final_stats = listener.get_statistics() print(f"📈 最终统计:") print(f" 总接收数据: {final_stats['total_received']} 条") print(f" 最后接收时间: {final_stats['last_receive_time']}") print("👋 监听结束")