373 lines
12 KiB
Python
373 lines
12 KiB
Python
|
#!/usr/bin/env python3
|
|||
|
# -*- coding: utf-8 -*-
|
|||
|
"""
|
|||
|
SUWood 数据监听器
|
|||
|
用于获取从其他程序启动的SUWood服务器发送给suw_client的数据
|
|||
|
"""
|
|||
|
|
|||
|
import socket
|
|||
|
import json
|
|||
|
import struct
|
|||
|
import threading
|
|||
|
import time
|
|||
|
import queue
|
|||
|
from datetime import datetime
|
|||
|
from typing import Dict, Any, Optional, List, Callable
|
|||
|
|
|||
|
|
|||
|
class SUWoodDataListener:
|
|||
|
"""SUWood数据监听器 - 监听服务器发送的数据"""
|
|||
|
|
|||
|
def __init__(self, host="127.0.0.1", port=7999):
|
|||
|
self.host = host
|
|||
|
self.port = port
|
|||
|
self.sock = None
|
|||
|
self.running = False
|
|||
|
self.listener_thread = None
|
|||
|
self.data_queue = queue.Queue()
|
|||
|
self.callbacks = [] # 数据接收回调函数列表
|
|||
|
self.seqno = 0
|
|||
|
|
|||
|
# 数据统计
|
|||
|
self.total_received = 0
|
|||
|
self.last_receive_time = None
|
|||
|
|
|||
|
def add_callback(self, callback: Callable[[Dict[str, Any]], None]):
|
|||
|
"""添加数据接收回调函数"""
|
|||
|
self.callbacks.append(callback)
|
|||
|
print(f"✅ 已添加数据回调函数: {callback.__name__}")
|
|||
|
|
|||
|
def remove_callback(self, callback: Callable[[Dict[str, Any]], None]):
|
|||
|
"""移除数据接收回调函数"""
|
|||
|
if callback in self.callbacks:
|
|||
|
self.callbacks.remove(callback)
|
|||
|
print(f"❌ 已移除数据回调函数: {callback.__name__}")
|
|||
|
|
|||
|
def connect(self) -> bool:
|
|||
|
"""连接到SUWood服务器"""
|
|||
|
try:
|
|||
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|||
|
self.sock.settimeout(10) # 设置连接超时
|
|||
|
self.sock.connect((self.host, self.port))
|
|||
|
print(f"✅ 成功连接到SUWood服务器 {self.host}:{self.port}")
|
|||
|
return True
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 连接SUWood服务器失败: {e}")
|
|||
|
self.sock = None
|
|||
|
return False
|
|||
|
|
|||
|
def disconnect(self):
|
|||
|
"""断开连接"""
|
|||
|
if self.sock:
|
|||
|
try:
|
|||
|
self.sock.close()
|
|||
|
except:
|
|||
|
pass
|
|||
|
self.sock = None
|
|||
|
print("🔌 已断开连接")
|
|||
|
|
|||
|
def start_listening(self):
|
|||
|
"""开始监听数据"""
|
|||
|
if self.running:
|
|||
|
print("⚠️ 监听器已在运行")
|
|||
|
return
|
|||
|
|
|||
|
if not self.connect():
|
|||
|
return
|
|||
|
|
|||
|
self.running = True
|
|||
|
self.listener_thread = threading.Thread(
|
|||
|
target=self._listen_loop, daemon=True)
|
|||
|
self.listener_thread.start()
|
|||
|
print("🎧 开始监听SUWood服务器数据...")
|
|||
|
|
|||
|
def stop_listening(self):
|
|||
|
"""停止监听"""
|
|||
|
self.running = False
|
|||
|
self.disconnect()
|
|||
|
|
|||
|
if self.listener_thread:
|
|||
|
self.listener_thread.join(timeout=3)
|
|||
|
|
|||
|
print("⛔ 数据监听已停止")
|
|||
|
|
|||
|
def _listen_loop(self):
|
|||
|
"""监听循环"""
|
|||
|
while self.running and self.sock:
|
|||
|
try:
|
|||
|
# 定期发送心跳命令以保持连接并获取数据
|
|||
|
self._send_heartbeat()
|
|||
|
|
|||
|
# 接收服务器响应数据
|
|||
|
data = self._receive_data()
|
|||
|
if data:
|
|||
|
self._process_received_data(data)
|
|||
|
|
|||
|
time.sleep(1) # 1秒间隔
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 监听过程中出错: {e}")
|
|||
|
if self.running:
|
|||
|
print("🔄 尝试重新连接...")
|
|||
|
self.disconnect()
|
|||
|
time.sleep(2)
|
|||
|
if not self.connect():
|
|||
|
break
|
|||
|
|
|||
|
def _send_heartbeat(self):
|
|||
|
"""发送心跳命令获取数据"""
|
|||
|
try:
|
|||
|
# 发送获取命令列表的请求
|
|||
|
msg = json.dumps({
|
|||
|
"cmd": "get_cmds",
|
|||
|
"params": {"from": "listener"}
|
|||
|
})
|
|||
|
|
|||
|
# 基于测试结果,使用兼容协议 (0x01030001)
|
|||
|
# 因为我们确认了接收端使用0x01030002响应
|
|||
|
self._send_message_compat(0x01, msg)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 发送心跳失败: {e}")
|
|||
|
|
|||
|
def _send_message(self, cmd: int, msg: str):
|
|||
|
"""发送消息到服务器 - 标准协议"""
|
|||
|
if not self.sock:
|
|||
|
return False
|
|||
|
|
|||
|
try:
|
|||
|
opcode = (cmd & 0xffff) | 0x01010000 # 标准协议: 0x01010001
|
|||
|
self.seqno += 1
|
|||
|
|
|||
|
msg_bytes = msg.encode('utf-8')
|
|||
|
header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0)
|
|||
|
|
|||
|
full_msg = header + msg_bytes
|
|||
|
self.sock.send(full_msg)
|
|||
|
return True
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 发送消息失败(标准协议): {e}")
|
|||
|
return False
|
|||
|
|
|||
|
def _send_message_compat(self, cmd: int, msg: str):
|
|||
|
"""发送消息到服务器 - 兼容协议"""
|
|||
|
if not self.sock:
|
|||
|
return False
|
|||
|
|
|||
|
try:
|
|||
|
opcode = (cmd & 0xffff) | 0x01030000 # 兼容协议: 0x01030001
|
|||
|
self.seqno += 1
|
|||
|
|
|||
|
msg_bytes = msg.encode('utf-8')
|
|||
|
header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0)
|
|||
|
|
|||
|
full_msg = header + msg_bytes
|
|||
|
self.sock.send(full_msg)
|
|||
|
print(f"🔄 使用兼容协议发送消息: 0x{opcode:08x}")
|
|||
|
return True
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 发送消息失败(兼容协议): {e}")
|
|||
|
return False
|
|||
|
|
|||
|
def _receive_data(self) -> Optional[Dict[str, Any]]:
|
|||
|
"""接收服务器数据"""
|
|||
|
if not self.sock:
|
|||
|
return None
|
|||
|
|
|||
|
try:
|
|||
|
# 设置非阻塞模式,避免永久等待
|
|||
|
self.sock.settimeout(1.0)
|
|||
|
|
|||
|
# 接收头部(16字节)
|
|||
|
header = self.sock.recv(16)
|
|||
|
if len(header) < 16:
|
|||
|
return None
|
|||
|
|
|||
|
# 解包获取消息长度
|
|||
|
msg_len, opcode, seqno, reserved = struct.unpack('iiii', header)
|
|||
|
|
|||
|
# 接收消息内容
|
|||
|
msg = b""
|
|||
|
to_recv_len = msg_len
|
|||
|
|
|||
|
while to_recv_len > 0:
|
|||
|
chunk = self.sock.recv(min(to_recv_len, 4096))
|
|||
|
if not chunk:
|
|||
|
break
|
|||
|
msg += chunk
|
|||
|
to_recv_len = msg_len - len(msg)
|
|||
|
|
|||
|
if len(msg) == msg_len:
|
|||
|
text_data = msg.decode('utf-8')
|
|||
|
parsed_data = json.loads(text_data)
|
|||
|
|
|||
|
# 添加元数据
|
|||
|
parsed_data['_meta'] = {
|
|||
|
'opcode': opcode,
|
|||
|
'seqno': seqno,
|
|||
|
'reserved': reserved,
|
|||
|
'receive_time': datetime.now().isoformat(),
|
|||
|
'message_length': msg_len
|
|||
|
}
|
|||
|
|
|||
|
return parsed_data
|
|||
|
|
|||
|
except socket.timeout:
|
|||
|
# 超时是正常的,继续循环
|
|||
|
pass
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 接收数据失败: {e}")
|
|||
|
|
|||
|
return None
|
|||
|
|
|||
|
def _process_received_data(self, data: Dict[str, Any]):
|
|||
|
"""处理接收到的数据"""
|
|||
|
self.total_received += 1
|
|||
|
self.last_receive_time = datetime.now()
|
|||
|
|
|||
|
# 添加到队列
|
|||
|
self.data_queue.put(data)
|
|||
|
|
|||
|
# 调用回调函数
|
|||
|
for callback in self.callbacks:
|
|||
|
try:
|
|||
|
callback(data)
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 回调函数 {callback.__name__} 执行失败: {e}")
|
|||
|
|
|||
|
# 打印数据摘要
|
|||
|
self._print_data_summary(data)
|
|||
|
|
|||
|
def _print_data_summary(self, data: Dict[str, Any]):
|
|||
|
"""打印数据摘要"""
|
|||
|
meta = data.get('_meta', {})
|
|||
|
receive_time = meta.get('receive_time', 'unknown')
|
|||
|
|
|||
|
print(f"📥 [{receive_time}] 收到数据:")
|
|||
|
print(f" 🔗 操作码: 0x{meta.get('opcode', 0):08x}")
|
|||
|
print(f" 📝 序列号: {meta.get('seqno', 0)}")
|
|||
|
print(f" 📊 数据大小: {meta.get('message_length', 0)} 字节")
|
|||
|
|
|||
|
# 打印主要数据内容
|
|||
|
if 'ret' in data:
|
|||
|
print(f" ✅ 返回状态: {data.get('ret')}")
|
|||
|
|
|||
|
if 'data' in data:
|
|||
|
data_content = data.get('data', {})
|
|||
|
if isinstance(data_content, dict):
|
|||
|
print(f" 📋 数据内容: {len(data_content)} 个字段")
|
|||
|
for key in list(data_content.keys())[:3]: # 显示前3个字段
|
|||
|
print(f" • {key}: {type(data_content[key]).__name__}")
|
|||
|
|
|||
|
print()
|
|||
|
|
|||
|
def get_latest_data(self) -> Optional[Dict[str, Any]]:
|
|||
|
"""获取最新的数据(非阻塞)"""
|
|||
|
try:
|
|||
|
return self.data_queue.get_nowait()
|
|||
|
except queue.Empty:
|
|||
|
return None
|
|||
|
|
|||
|
def get_all_data(self) -> List[Dict[str, Any]]:
|
|||
|
"""获取所有未处理的数据"""
|
|||
|
data_list = []
|
|||
|
while True:
|
|||
|
try:
|
|||
|
data_list.append(self.data_queue.get_nowait())
|
|||
|
except queue.Empty:
|
|||
|
break
|
|||
|
return data_list
|
|||
|
|
|||
|
def wait_for_data(self, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
|
|||
|
"""等待接收数据(阻塞)"""
|
|||
|
try:
|
|||
|
return self.data_queue.get(timeout=timeout)
|
|||
|
except queue.Empty:
|
|||
|
return None
|
|||
|
|
|||
|
def get_statistics(self) -> Dict[str, Any]:
|
|||
|
"""获取统计信息"""
|
|||
|
return {
|
|||
|
"total_received": self.total_received,
|
|||
|
"last_receive_time": self.last_receive_time.isoformat() if self.last_receive_time else None,
|
|||
|
"queue_size": self.data_queue.qsize(),
|
|||
|
"is_running": self.running,
|
|||
|
"is_connected": self.sock is not None,
|
|||
|
"callbacks_count": len(self.callbacks)
|
|||
|
}
|
|||
|
|
|||
|
# 回调函数示例
|
|||
|
|
|||
|
|
|||
|
def print_suwood_data(data: Dict[str, Any]):
|
|||
|
"""打印SUWood数据的回调函数"""
|
|||
|
print("🎯 SUWood数据回调:")
|
|||
|
print(f" 数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
|
|||
|
|
|||
|
|
|||
|
def save_suwood_data(data: Dict[str, Any]):
|
|||
|
"""保存SUWood数据的回调函数"""
|
|||
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|||
|
filename = f"suwood_data_{timestamp}.json"
|
|||
|
|
|||
|
try:
|
|||
|
with open(filename, 'w', encoding='utf-8') as f:
|
|||
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|||
|
print(f"💾 数据已保存到: {filename}")
|
|||
|
except Exception as e:
|
|||
|
print(f"❌ 保存数据失败: {e}")
|
|||
|
|
|||
|
# 工具函数
|
|||
|
|
|||
|
|
|||
|
def create_listener(host="127.0.0.1", port=7999) -> SUWoodDataListener:
|
|||
|
"""创建SUWood数据监听器"""
|
|||
|
return SUWoodDataListener(host, port)
|
|||
|
|
|||
|
|
|||
|
def start_monitoring(host="127.0.0.1", port=7999, save_to_file=True, print_data=True):
|
|||
|
"""开始监控SUWood服务器数据"""
|
|||
|
listener = create_listener(host, port)
|
|||
|
|
|||
|
# 添加回调函数
|
|||
|
if print_data:
|
|||
|
listener.add_callback(print_suwood_data)
|
|||
|
|
|||
|
if save_to_file:
|
|||
|
listener.add_callback(save_suwood_data)
|
|||
|
|
|||
|
# 开始监听
|
|||
|
listener.start_listening()
|
|||
|
|
|||
|
return listener
|
|||
|
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
print("🎧 SUWood数据监听器")
|
|||
|
print("=" * 50)
|
|||
|
|
|||
|
# 创建监听器
|
|||
|
listener = start_monitoring()
|
|||
|
|
|||
|
try:
|
|||
|
print("⌨️ 按 Ctrl+C 停止监听...")
|
|||
|
while True:
|
|||
|
time.sleep(1)
|
|||
|
stats = listener.get_statistics()
|
|||
|
if stats['total_received'] > 0:
|
|||
|
print(
|
|||
|
f"📊 已接收 {stats['total_received']} 条数据,队列中有 {stats['queue_size']} 条待处理")
|
|||
|
|
|||
|
except KeyboardInterrupt:
|
|||
|
print("\n⛔ 停止监听...")
|
|||
|
listener.stop_listening()
|
|||
|
|
|||
|
# 显示最终统计
|
|||
|
final_stats = listener.get_statistics()
|
|||
|
print(f"📈 最终统计:")
|
|||
|
print(f" 总接收数据: {final_stats['total_received']} 条")
|
|||
|
print(f" 最后接收时间: {final_stats['last_receive_time']}")
|
|||
|
print("👋 监听结束")
|