import socket import json import threading import os import sys from datetime import datetime # 设置标准输出编码为 UTF-8(解决 Windows 控制台编码问题) if sys.platform.startswith('win'): import codecs sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach()) sys.stderr = codecs.getwriter('utf-8')(sys.stderr.detach()) class JSONSocketServer: def __init__(self, host='localhost', port=8888): self.host = host self.port = port self.socket = None self.clients = [] def start_server(self): """启动服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.host, self.port)) self.socket.listen(5) print(f"🚀 服务器启动成功!") print(f"📍 监听地址: {self.host}:{self.port}") print(f"⏰ 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"✅ 等待客户端连接...") while True: client_socket, address = self.socket.accept() print(f"🔗 客户端连接: {address}") # 为每个客户端创建独立线程 client_thread = threading.Thread( target=self.handle_client, args=(client_socket, address) ) client_thread.daemon = True client_thread.start() except Exception as e: print(f"❌ 服务器启动失败: {e}") finally: if self.socket: self.socket.close() def handle_client(self, client_socket, address): """处理客户端请求""" try: while True: # 接收客户端消息 data = client_socket.recv(4096).decode('utf-8') if not data: break print(f"📨 收到来自 {address} 的消息: {data}") # 解析客户端请求 try: request = json.loads(data) response = self.process_request(request, address) # 发送响应 response_json = json.dumps(response, ensure_ascii=False, indent=2) client_socket.send(response_json.encode('utf-8')) except json.JSONDecodeError: # 如果不是 JSON 格式,当作普通消息处理 response = { "status": "success", "message": f"服务器收到消息: {data}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } response_json = json.dumps(response, ensure_ascii=False, indent=2) client_socket.send(response_json.encode('utf-8')) except Exception as e: print(f"❌ 处理客户端 {address} 时出错: {e}") finally: client_socket.close() print(f"🔌 客户端 {address} 已断开连接") def process_request(self, request, address): """处理客户端的 JSON 请求""" command = request.get('command', 'unknown') if command == 'get_file': # 发送文件内容 filename = request.get('filename', 'test_data.json') return self.send_file(filename, address) elif command == 'save_file': # 保存文件 filename = request.get('filename', 'received_data.json') content = request.get('content', {}) return self.save_file(filename, content, address) elif command == 'list_files': # 列出当前目录的 JSON 文件 json_files = [f for f in os.listdir('.') if f.endswith('.json')] return { "status": "success", "command": "list_files", "files": json_files, "count": len(json_files), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } elif command == 'ping': # 心跳测试 return { "status": "success", "command": "ping", "message": "pong", "server_time": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } else: # 未知命令 return { "status": "error", "message": f"未知命令: {command}", "available_commands": ["get_file", "save_file", "list_files", "ping"], "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } def send_file(self, filename, address): """发送文件内容""" try: if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as f: content = json.load(f) return { "status": "success", "command": "get_file", "filename": filename, "content": content, "size": os.path.getsize(filename), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } else: return { "status": "error", "command": "get_file", "message": f"文件 {filename} 不存在", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } except Exception as e: return { "status": "error", "command": "get_file", "message": f"读取文件失败: {str(e)}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } def save_file(self, filename, content, address): """保存文件""" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(content, f, ensure_ascii=False, indent=2) return { "status": "success", "command": "save_file", "filename": filename, "message": f"文件 {filename} 保存成功", "size": os.path.getsize(filename), "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } except Exception as e: return { "status": "error", "command": "save_file", "message": f"保存文件失败: {str(e)}", "timestamp": datetime.now().isoformat(), "client_address": f"{address[0]}:{address[1]}" } if __name__ == "__main__": server = JSONSocketServer() try: server.start_server() except KeyboardInterrupt: print("\n⛔ 服务器已停止")