#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Socket JSON 服务器 - 编码安全版本 解决 Windows 控制台编码问题 """ import socket import json import threading import os import sys from datetime import datetime # 导入编码修复模块 try: from encoding_fix import safe_print, safe_format_message except ImportError: # 如果导入失败,使用简单的备用函数 def safe_print(text, fallback=None): try: print(text) except UnicodeEncodeError: print(text.encode('ascii', errors='ignore').decode('ascii')) def safe_format_message(text): # 简单替换emoji为文本 replacements = { "🚀": "[START]", "📍": "[ADDR]", "⏰": "[TIME]", "✅": "[OK]", "🔗": "[CONN]", "📨": "[MSG]", "❌": "[ERROR]", "🔌": "[DISC]", "📋": "[LIST]", "🖥️": "[SERVER]", "💻": "[CLIENT]", "📁": "[FILE]", "🎯": "[RESULT]", "⚠️": "[WARN]", "📊": "[REPORT]", "⛔": "[STOP]" } for emoji, replacement in replacements.items(): text = text.replace(emoji, replacement) return text class JSONSocketServerSafe: def __init__(self, host='localhost', port=8888): self.host = host self.port = port self.socket = None self.clients = [] def start_server(self): """启动服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.host, self.port)) self.socket.listen(5) safe_print("🚀 服务器启动成功!", "[START] 服务器启动成功!") safe_print(f"📍 监听地址: {self.host}:{self.port}", f"[ADDR] 监听地址: {self.host}:{self.port}") safe_print(f"⏰ 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"[TIME] 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") safe_print("✅ 等待客户端连接...", "[OK] 等待客户端连接...") while True: client_socket, address = self.socket.accept() safe_print(f"🔗 客户端连接: {address}", f"[CONN] 客户端连接: {address}") # 为每个客户端创建独立线程 client_thread = threading.Thread( target=self.handle_client, args=(client_socket, address) ) client_thread.daemon = True client_thread.start() except Exception as e: safe_print(f"❌ 服务器启动失败: {e}", f"[ERROR] 服务器启动失败: {e}") finally: if self.socket: self.socket.close() def handle_client(self, client_socket, address): """处理客户端请求""" try: while True: # 接收客户端消息 data = client_socket.recv(4096).decode('utf-8') if not data: break safe_print(f"📨 收到来自 {address} 的消息: {data}", f"[MSG] 收到来自 {address} 的消息: {data}") # 解析客户端请求 try: request = json.loads(data) response = self.process_request(request, address) # 发送响应 response_json = json.dumps(response, ensure_ascii=False, indent=2) client_socket.send(response_json.encode('utf-8')) except json.JSONDecodeError: # 如果不是 JSON 格式,当作普通消息处理 response = { "status": "success", "message": f"服务器收到消息: {data}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } response_json = json.dumps(response, ensure_ascii=False, indent=2) client_socket.send(response_json.encode('utf-8')) except Exception as e: safe_print(f"❌ 处理客户端 {address} 时出错: {e}", f"[ERROR] 处理客户端 {address} 时出错: {e}") finally: client_socket.close() safe_print(f"🔌 客户端 {address} 已断开连接", f"[DISC] 客户端 {address} 已断开连接") def process_request(self, request, address): """处理客户端的 JSON 请求""" command = request.get('command', 'unknown') if command == 'get_file': # 发送文件内容 filename = request.get('filename', 'test_data.json') return self.send_file(filename, address) elif command == 'save_file': # 保存文件 filename = request.get('filename', 'received_data.json') content = request.get('content', {}) return self.save_file(filename, content, address) elif command == 'list_files': # 列出当前目录的 JSON 文件 json_files = [f for f in os.listdir('.') if f.endswith('.json')] return { "status": "success", "command": "list_files", "files": json_files, "count": len(json_files), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } elif command == 'ping': # 心跳测试 return { "status": "success", "command": "ping", "message": "pong", "server_time": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } else: # 未知命令 return { "status": "error", "message": f"未知命令: {command}", "available_commands": ["get_file", "save_file", "list_files", "ping"], "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } def send_file(self, filename, address): """发送文件内容""" try: if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as f: content = json.load(f) return { "status": "success", "command": "get_file", "filename": filename, "content": content, "size": os.path.getsize(filename), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } else: return { "status": "error", "command": "get_file", "message": f"文件 {filename} 不存在", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } except Exception as e: return { "status": "error", "command": "get_file", "message": f"读取文件失败: {str(e)}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } def save_file(self, filename, content, address): """保存文件""" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(content, f, ensure_ascii=False, indent=2) return { "status": "success", "command": "save_file", "filename": filename, "message": f"文件 {filename} 保存成功", "size": os.path.getsize(filename), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } except Exception as e: return { "status": "error", "command": "save_file", "message": f"保存文件失败: {str(e)}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } if __name__ == "__main__": server = JSONSocketServerSafe() try: server.start_server() except KeyboardInterrupt: safe_print("\n⛔ 服务器已停止", "\n[STOP] 服务器已停止")