#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SUWood数据桌面保存器 将实时接收到的SUWood服务器数据转换成JSON格式,保存到桌面001.json文件 """ from data_listener import create_listener import os import sys import json import time import threading from datetime import datetime from typing import Dict, Any, List # 添加当前目录到路径 current_dir = os.path.dirname(os.path.abspath(__file__)) if current_dir not in sys.path: sys.path.insert(0, current_dir) class DesktopDataSaver: """桌面数据保存器""" def __init__(self, server_host="127.0.0.1", server_port=7999): self.listener = None self.server_host = server_host self.server_port = server_port # 获取桌面路径 self.desktop_path = self._get_desktop_path() self.json_file_path = os.path.join(self.desktop_path, "001.json") # 数据存储 self.collected_data = [] self.last_save_time = None # 保存控制 self.auto_save_interval = 2 # 2秒自动保存一次 self.max_data_count = 1000 # 最多保存1000条数据 print(f"📁 JSON文件路径: {self.json_file_path}") def _get_desktop_path(self) -> str: """获取桌面路径""" # Windows if os.name == 'nt': desktop = os.path.join(os.path.expanduser('~'), 'Desktop') if os.path.exists(desktop): return desktop # 中文系统可能是"桌面" desktop_cn = os.path.join(os.path.expanduser('~'), '桌面') if os.path.exists(desktop_cn): return desktop_cn # macOS/Linux desktop = os.path.join(os.path.expanduser('~'), 'Desktop') if os.path.exists(desktop): return desktop # 如果都找不到,使用当前目录 print("⚠️ 未找到桌面目录,将保存到当前目录") return os.getcwd() def start_monitoring(self): """开始监控并保存数据""" print(f"🚀 开始监控SUWood服务器数据: {self.server_host}:{self.server_port}") print(f"💾 数据将保存到: {self.json_file_path}") # 创建监听器 self.listener = create_listener(self.server_host, self.server_port) # 添加数据处理回调 self.listener.add_callback(self.on_data_received) # 开始监听 self.listener.start_listening() # 启动自动保存线程 self._start_auto_save_thread() def stop_monitoring(self): """停止监控""" if self.listener: self.listener.stop_listening() # 最后保存一次数据 self._save_to_json() print("⛔ 数据监控已停止") def on_data_received(self, data: Dict[str, Any]): """处理接收到的数据""" # 添加时间戳 processed_data = { "timestamp": datetime.now().isoformat(), "raw_data": data } # 添加到收集列表 self.collected_data.append(processed_data) # 保持数据量在限制范围内 if len(self.collected_data) > self.max_data_count: self.collected_data.pop(0) # 移除最旧的数据 # 打印接收信息 data_size = len(json.dumps(data, ensure_ascii=False)) print( f"📥 [{datetime.now().strftime('%H:%M:%S')}] 收到数据: {data_size} 字节,总计: {len(self.collected_data)} 条") # 如果数据包含重要信息,立即保存 if self._is_important_data(data): print("⚡ 检测到重要数据,立即保存...") self._save_to_json() def _is_important_data(self, data: Dict[str, Any]) -> bool: """判断是否为重要数据(需要立即保存)""" # 检查是否包含几何数据 if 'data' in data: data_content = data['data'] important_fields = ['meshes', 'objects', 'vertices', 'faces', 'cmds'] return any(field in data_content for field in important_fields) # 检查是否为成功的命令响应 if data.get('ret') == 1: return True return False def _start_auto_save_thread(self): """启动自动保存线程""" def auto_save_loop(): while self.listener and self.listener.running: time.sleep(self.auto_save_interval) if self.collected_data: self._save_to_json() save_thread = threading.Thread(target=auto_save_loop, daemon=True) save_thread.start() print(f"🔄 自动保存线程已启动(间隔: {self.auto_save_interval}秒)") def _save_to_json(self): """保存数据到JSON文件""" if not self.collected_data: return try: # 准备保存的数据结构 save_data = { "save_info": { "save_time": datetime.now().isoformat(), "total_records": len(self.collected_data), "data_source": f"{self.server_host}:{self.server_port}", "file_version": "1.0" }, "data_records": self.collected_data } # 写入JSON文件 with open(self.json_file_path, 'w', encoding='utf-8') as f: json.dump(save_data, f, ensure_ascii=False, indent=2) self.last_save_time = datetime.now() file_size = os.path.getsize(self.json_file_path) print( f"💾 数据已保存: {len(self.collected_data)} 条记录,文件大小: {file_size/1024:.1f}KB") except Exception as e: print(f"❌ 保存JSON文件失败: {e}") def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" file_exists = os.path.exists(self.json_file_path) file_size = os.path.getsize(self.json_file_path) if file_exists else 0 stats = { "collected_records": len(self.collected_data), "json_file_path": self.json_file_path, "json_file_exists": file_exists, "json_file_size_kb": file_size / 1024 if file_size > 0 else 0, "last_save_time": self.last_save_time.isoformat() if self.last_save_time else None, "listener_running": self.listener.running if self.listener else False } if self.listener: listener_stats = self.listener.get_statistics() stats.update({ "total_received": listener_stats.get("total_received", 0), "queue_size": listener_stats.get("queue_size", 0), "is_connected": listener_stats.get("is_connected", False) }) return stats def manually_save(self): """手动保存数据""" print("🖱️ 手动保存数据...") self._save_to_json() def clear_data(self): """清空收集的数据""" self.collected_data.clear() print("🗑️ 已清空收集的数据") def load_existing_data(self) -> bool: """加载已存在的JSON文件数据""" if not os.path.exists(self.json_file_path): print("📄 桌面上没有找到001.json文件") return False try: with open(self.json_file_path, 'r', encoding='utf-8') as f: existing_data = json.load(f) if 'data_records' in existing_data: self.collected_data = existing_data['data_records'] print(f"📂 已加载现有数据: {len(self.collected_data)} 条记录") return True else: print("⚠️ 现有JSON文件格式不匹配") return False except Exception as e: print(f"❌ 加载现有数据失败: {e}") return False def main(): """主函数""" print("💾 SUWood数据桌面保存器") print("=" * 50) print("功能: 实时接收SUWood服务器数据并保存到桌面001.json") print() # 创建数据保存器 saver = DesktopDataSaver() # 询问是否加载现有数据 if os.path.exists(saver.json_file_path): response = input( "📂 发现桌面已存在001.json文件,是否加载现有数据?(y/n): ").strip().lower() if response == 'y': saver.load_existing_data() try: # 开始监控 saver.start_monitoring() print() print("⌨️ 控制命令:") print(" - 按 Enter 查看统计信息") print(" - 输入 's' 手动保存数据") print(" - 输入 'c' 清空收集的数据") print(" - 输入 'q' 或 Ctrl+C 退出") print() print("📡 等待SUWood服务器数据...") # 主循环 while True: try: user_input = input().strip().lower() if user_input == 'q': break elif user_input == 's': saver.manually_save() elif user_input == 'c': saver.clear_data() else: # 显示统计信息 stats = saver.get_statistics() print() print("📊 统计信息:") print(f" 📥 收集记录: {stats['collected_records']} 条") print(f" 📡 接收总数: {stats.get('total_received', 0)} 条") print(f" 📁 文件大小: {stats['json_file_size_kb']:.1f} KB") print(f" 💾 最后保存: {stats['last_save_time'] or '未保存'}") print( f" 🔗 连接状态: {'已连接' if stats.get('is_connected') else '未连接'}") print() except EOFError: # Ctrl+D break except KeyboardInterrupt: print("\n⛔ 收到中断信号...") finally: # 停止监控并保存数据 print("💾 正在保存最终数据...") saver.stop_monitoring() # 显示最终统计 final_stats = saver.get_statistics() print() print("📈 最终统计:") print(f" 📥 总收集记录: {final_stats['collected_records']} 条") print(f" 📁 JSON文件: {final_stats['json_file_path']}") print(f" 💾 文件大小: {final_stats['json_file_size_kb']:.1f} KB") print() print("👋 程序结束,数据已保存到桌面001.json") if __name__ == "__main__": main()