suwoodblender/blenderpython/data_listener.py

373 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUWood 数据监听器
用于获取从其他程序启动的SUWood服务器发送给suw_client的数据
"""
import socket
import json
import struct
import threading
import time
import queue
from datetime import datetime
from typing import Dict, Any, Optional, List, Callable
class SUWoodDataListener:
"""SUWood数据监听器 - 监听服务器发送的数据"""
def __init__(self, host="127.0.0.1", port=7999):
self.host = host
self.port = port
self.sock = None
self.running = False
self.listener_thread = None
self.data_queue = queue.Queue()
self.callbacks = [] # 数据接收回调函数列表
self.seqno = 0
# 数据统计
self.total_received = 0
self.last_receive_time = None
def add_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""添加数据接收回调函数"""
self.callbacks.append(callback)
print(f"✅ 已添加数据回调函数: {callback.__name__}")
def remove_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""移除数据接收回调函数"""
if callback in self.callbacks:
self.callbacks.remove(callback)
print(f"❌ 已移除数据回调函数: {callback.__name__}")
def connect(self) -> bool:
"""连接到SUWood服务器"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(10) # 设置连接超时
self.sock.connect((self.host, self.port))
print(f"✅ 成功连接到SUWood服务器 {self.host}:{self.port}")
return True
except Exception as e:
print(f"❌ 连接SUWood服务器失败: {e}")
self.sock = None
return False
def disconnect(self):
"""断开连接"""
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
print("🔌 已断开连接")
def start_listening(self):
"""开始监听数据"""
if self.running:
print("⚠️ 监听器已在运行")
return
if not self.connect():
return
self.running = True
self.listener_thread = threading.Thread(
target=self._listen_loop, daemon=True)
self.listener_thread.start()
print("🎧 开始监听SUWood服务器数据...")
def stop_listening(self):
"""停止监听"""
self.running = False
self.disconnect()
if self.listener_thread:
self.listener_thread.join(timeout=3)
print("⛔ 数据监听已停止")
def _listen_loop(self):
"""监听循环"""
while self.running and self.sock:
try:
# 定期发送心跳命令以保持连接并获取数据
self._send_heartbeat()
# 接收服务器响应数据
data = self._receive_data()
if data:
self._process_received_data(data)
time.sleep(1) # 1秒间隔
except Exception as e:
print(f"❌ 监听过程中出错: {e}")
if self.running:
print("🔄 尝试重新连接...")
self.disconnect()
time.sleep(2)
if not self.connect():
break
def _send_heartbeat(self):
"""发送心跳命令获取数据"""
try:
# 发送获取命令列表的请求
msg = json.dumps({
"cmd": "get_cmds",
"params": {"from": "listener"}
})
# 基于测试结果,使用兼容协议 (0x01030001)
# 因为我们确认了接收端使用0x01030002响应
self._send_message_compat(0x01, msg)
except Exception as e:
print(f"❌ 发送心跳失败: {e}")
def _send_message(self, cmd: int, msg: str):
"""发送消息到服务器 - 标准协议"""
if not self.sock:
return False
try:
opcode = (cmd & 0xffff) | 0x01010000 # 标准协议: 0x01010001
self.seqno += 1
msg_bytes = msg.encode('utf-8')
header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0)
full_msg = header + msg_bytes
self.sock.send(full_msg)
return True
except Exception as e:
print(f"❌ 发送消息失败(标准协议): {e}")
return False
def _send_message_compat(self, cmd: int, msg: str):
"""发送消息到服务器 - 兼容协议"""
if not self.sock:
return False
try:
opcode = (cmd & 0xffff) | 0x01030000 # 兼容协议: 0x01030001
self.seqno += 1
msg_bytes = msg.encode('utf-8')
header = struct.pack('iiii', len(msg_bytes), opcode, self.seqno, 0)
full_msg = header + msg_bytes
self.sock.send(full_msg)
print(f"🔄 使用兼容协议发送消息: 0x{opcode:08x}")
return True
except Exception as e:
print(f"❌ 发送消息失败(兼容协议): {e}")
return False
def _receive_data(self) -> Optional[Dict[str, Any]]:
"""接收服务器数据"""
if not self.sock:
return None
try:
# 设置非阻塞模式,避免永久等待
self.sock.settimeout(1.0)
# 接收头部16字节
header = self.sock.recv(16)
if len(header) < 16:
return None
# 解包获取消息长度
msg_len, opcode, seqno, reserved = struct.unpack('iiii', header)
# 接收消息内容
msg = b""
to_recv_len = msg_len
while to_recv_len > 0:
chunk = self.sock.recv(min(to_recv_len, 4096))
if not chunk:
break
msg += chunk
to_recv_len = msg_len - len(msg)
if len(msg) == msg_len:
text_data = msg.decode('utf-8')
parsed_data = json.loads(text_data)
# 添加元数据
parsed_data['_meta'] = {
'opcode': opcode,
'seqno': seqno,
'reserved': reserved,
'receive_time': datetime.now().isoformat(),
'message_length': msg_len
}
return parsed_data
except socket.timeout:
# 超时是正常的,继续循环
pass
except Exception as e:
print(f"❌ 接收数据失败: {e}")
return None
def _process_received_data(self, data: Dict[str, Any]):
"""处理接收到的数据"""
self.total_received += 1
self.last_receive_time = datetime.now()
# 添加到队列
self.data_queue.put(data)
# 调用回调函数
for callback in self.callbacks:
try:
callback(data)
except Exception as e:
print(f"❌ 回调函数 {callback.__name__} 执行失败: {e}")
# 打印数据摘要
self._print_data_summary(data)
def _print_data_summary(self, data: Dict[str, Any]):
"""打印数据摘要"""
meta = data.get('_meta', {})
receive_time = meta.get('receive_time', 'unknown')
print(f"📥 [{receive_time}] 收到数据:")
print(f" 🔗 操作码: 0x{meta.get('opcode', 0):08x}")
print(f" 📝 序列号: {meta.get('seqno', 0)}")
print(f" 📊 数据大小: {meta.get('message_length', 0)} 字节")
# 打印主要数据内容
if 'ret' in data:
print(f" ✅ 返回状态: {data.get('ret')}")
if 'data' in data:
data_content = data.get('data', {})
if isinstance(data_content, dict):
print(f" 📋 数据内容: {len(data_content)} 个字段")
for key in list(data_content.keys())[:3]: # 显示前3个字段
print(f"{key}: {type(data_content[key]).__name__}")
print()
def get_latest_data(self) -> Optional[Dict[str, Any]]:
"""获取最新的数据(非阻塞)"""
try:
return self.data_queue.get_nowait()
except queue.Empty:
return None
def get_all_data(self) -> List[Dict[str, Any]]:
"""获取所有未处理的数据"""
data_list = []
while True:
try:
data_list.append(self.data_queue.get_nowait())
except queue.Empty:
break
return data_list
def wait_for_data(self, timeout: float = 10.0) -> Optional[Dict[str, Any]]:
"""等待接收数据(阻塞)"""
try:
return self.data_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_received": self.total_received,
"last_receive_time": self.last_receive_time.isoformat() if self.last_receive_time else None,
"queue_size": self.data_queue.qsize(),
"is_running": self.running,
"is_connected": self.sock is not None,
"callbacks_count": len(self.callbacks)
}
# 回调函数示例
def print_suwood_data(data: Dict[str, Any]):
"""打印SUWood数据的回调函数"""
print("🎯 SUWood数据回调:")
print(f" 数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
def save_suwood_data(data: Dict[str, Any]):
"""保存SUWood数据的回调函数"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"suwood_data_{timestamp}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"💾 数据已保存到: {filename}")
except Exception as e:
print(f"❌ 保存数据失败: {e}")
# 工具函数
def create_listener(host="127.0.0.1", port=7999) -> SUWoodDataListener:
"""创建SUWood数据监听器"""
return SUWoodDataListener(host, port)
def start_monitoring(host="127.0.0.1", port=7999, save_to_file=True, print_data=True):
"""开始监控SUWood服务器数据"""
listener = create_listener(host, port)
# 添加回调函数
if print_data:
listener.add_callback(print_suwood_data)
if save_to_file:
listener.add_callback(save_suwood_data)
# 开始监听
listener.start_listening()
return listener
if __name__ == "__main__":
print("🎧 SUWood数据监听器")
print("=" * 50)
# 创建监听器
listener = start_monitoring()
try:
print("⌨️ 按 Ctrl+C 停止监听...")
while True:
time.sleep(1)
stats = listener.get_statistics()
if stats['total_received'] > 0:
print(
f"📊 已接收 {stats['total_received']} 条数据,队列中有 {stats['queue_size']} 条待处理")
except KeyboardInterrupt:
print("\n⛔ 停止监听...")
listener.stop_listening()
# 显示最终统计
final_stats = listener.get_statistics()
print(f"📈 最终统计:")
print(f" 总接收数据: {final_stats['total_received']}")
print(f" 最后接收时间: {final_stats['last_receive_time']}")
print("👋 监听结束")