创建Socket JSON传输系统 - 包含服务器端、客户端、测试工具和完整文档

This commit is contained in:
Pei Xueke 2025-07-01 14:01:38 +08:00
commit 50e196ad9b
6 changed files with 1028 additions and 0 deletions

230
client.py Normal file
View File

@ -0,0 +1,230 @@
import socket
import json
import time
from datetime import datetime
class JSONSocketClient:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.socket = None
self.connected = False
def connect(self):
"""连接到服务器"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
self.connected = True
print(f"✅ 已连接到服务器 {self.host}:{self.port}")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.socket:
self.socket.close()
self.connected = False
print("🔌 已断开连接")
def send_request(self, request):
"""发送请求并接收响应"""
if not self.connected:
print("❌ 未连接到服务器")
return None
try:
# 发送请求
request_json = json.dumps(request, ensure_ascii=False)
self.socket.send(request_json.encode('utf-8'))
print(f"📤 发送请求: {request_json}")
# 接收响应
response_data = self.socket.recv(4096).decode('utf-8')
response = json.loads(response_data)
print(f"📥 收到响应: {json.dumps(response, ensure_ascii=False, indent=2)}")
return response
except Exception as e:
print(f"❌ 请求失败: {e}")
return None
def send_message(self, message):
"""发送普通消息(非 JSON 格式)"""
if not self.connected:
print("❌ 未连接到服务器")
return None
try:
self.socket.send(message.encode('utf-8'))
print(f"📤 发送消息: {message}")
response_data = self.socket.recv(4096).decode('utf-8')
response = json.loads(response_data)
print(f"📥 收到响应: {json.dumps(response, ensure_ascii=False, indent=2)}")
return response
except Exception as e:
print(f"❌ 发送消息失败: {e}")
return None
def get_file(self, filename='test_data.json'):
"""从服务器获取文件"""
request = {
"command": "get_file",
"filename": filename,
"timestamp": datetime.now().isoformat()
}
return self.send_request(request)
def save_file(self, filename, content):
"""保存文件到服务器"""
request = {
"command": "save_file",
"filename": filename,
"content": content,
"timestamp": datetime.now().isoformat()
}
return self.send_request(request)
def list_files(self):
"""列出服务器上的 JSON 文件"""
request = {
"command": "list_files",
"timestamp": datetime.now().isoformat()
}
return self.send_request(request)
def ping(self):
"""测试服务器连接"""
request = {
"command": "ping",
"timestamp": datetime.now().isoformat()
}
return self.send_request(request)
def interactive_mode(self):
"""交互模式"""
print("\n🎮 进入交互模式")
print("可用命令:")
print(" 1. get <filename> - 获取文件")
print(" 2. save <filename> - 保存文件(会提示输入内容)")
print(" 3. list - 列出文件")
print(" 4. ping - 测试连接")
print(" 5. msg <message> - 发送普通消息")
print(" 6. quit - 退出")
print()
while self.connected:
try:
command = input("👉 请输入命令: ").strip()
if not command:
continue
parts = command.split(' ', 1)
cmd = parts[0].lower()
if cmd == 'quit':
break
elif cmd == 'get':
filename = parts[1] if len(parts) > 1 else 'test_data.json'
self.get_file(filename)
elif cmd == 'save':
filename = parts[1] if len(parts) > 1 else 'new_data.json'
print("请输入要保存的 JSON 内容(输入空行结束):")
lines = []
while True:
line = input()
if not line:
break
lines.append(line)
if lines:
try:
content = json.loads('\n'.join(lines))
self.save_file(filename, content)
except json.JSONDecodeError as e:
print(f"❌ JSON 格式错误: {e}")
elif cmd == 'list':
self.list_files()
elif cmd == 'ping':
self.ping()
elif cmd == 'msg':
message = parts[1] if len(parts) > 1 else "Hello Server!"
self.send_message(message)
else:
print(f"❌ 未知命令: {cmd}")
print() # 空行分隔
except KeyboardInterrupt:
break
except Exception as e:
print(f"❌ 命令执行错误: {e}")
print("👋 退出交互模式")
def test_client():
"""测试客户端功能"""
client = JSONSocketClient()
print("🧪 开始测试客户端...")
# 连接服务器
if not client.connect():
return
try:
# 测试 1: Ping
print("\n--- 测试 1: Ping ---")
client.ping()
# 测试 2: 列出文件
print("\n--- 测试 2: 列出文件 ---")
client.list_files()
# 测试 3: 获取文件
print("\n--- 测试 3: 获取文件 ---")
client.get_file('test_data.json')
# 测试 4: 保存文件
print("\n--- 测试 4: 保存文件 ---")
test_content = {
"message": "这是客户端测试数据",
"timestamp": datetime.now().isoformat(),
"data": {
"name": "测试用户",
"age": 25,
"hobbies": ["编程", "音乐", "阅读"]
}
}
client.save_file('client_test.json', test_content)
# 测试 5: 发送普通消息
print("\n--- 测试 5: 发送普通消息 ---")
client.send_message("Hello Server! This is a test message.")
print("\n✅ 所有测试完成!")
finally:
client.disconnect()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'test':
# 运行测试
test_client()
else:
# 交互模式
client = JSONSocketClient()
if client.connect():
try:
client.interactive_mode()
finally:
client.disconnect()

191
server.py Normal file
View File

@ -0,0 +1,191 @@
import socket
import json
import threading
import os
from datetime import datetime
class JSONSocketServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.socket = None
self.clients = []
def start_server(self):
"""启动服务器"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
print(f"🚀 服务器启动成功!")
print(f"📍 监听地址: {self.host}:{self.port}")
print(f"⏰ 启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"✅ 等待客户端连接...")
while True:
client_socket, address = self.socket.accept()
print(f"🔗 客户端连接: {address}")
# 为每个客户端创建独立线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except Exception as e:
print(f"❌ 服务器启动失败: {e}")
finally:
if self.socket:
self.socket.close()
def handle_client(self, client_socket, address):
"""处理客户端请求"""
try:
while True:
# 接收客户端消息
data = client_socket.recv(4096).decode('utf-8')
if not data:
break
print(f"📨 收到来自 {address} 的消息: {data}")
# 解析客户端请求
try:
request = json.loads(data)
response = self.process_request(request, address)
# 发送响应
response_json = json.dumps(response, ensure_ascii=False, indent=2)
client_socket.send(response_json.encode('utf-8'))
except json.JSONDecodeError:
# 如果不是 JSON 格式,当作普通消息处理
response = {
"status": "success",
"message": f"服务器收到消息: {data}",
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
response_json = json.dumps(response, ensure_ascii=False, indent=2)
client_socket.send(response_json.encode('utf-8'))
except Exception as e:
print(f"❌ 处理客户端 {address} 时出错: {e}")
finally:
client_socket.close()
print(f"🔌 客户端 {address} 已断开连接")
def process_request(self, request, address):
"""处理客户端的 JSON 请求"""
command = request.get('command', 'unknown')
if command == 'get_file':
# 发送文件内容
filename = request.get('filename', 'test_data.json')
return self.send_file(filename, address)
elif command == 'save_file':
# 保存文件
filename = request.get('filename', 'received_data.json')
content = request.get('content', {})
return self.save_file(filename, content, address)
elif command == 'list_files':
# 列出当前目录的 JSON 文件
json_files = [f for f in os.listdir('.') if f.endswith('.json')]
return {
"status": "success",
"command": "list_files",
"files": json_files,
"count": len(json_files),
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
elif command == 'ping':
# 心跳测试
return {
"status": "success",
"command": "ping",
"message": "pong",
"server_time": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
else:
# 未知命令
return {
"status": "error",
"message": f"未知命令: {command}",
"available_commands": ["get_file", "save_file", "list_files", "ping"],
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
def send_file(self, filename, address):
"""发送文件内容"""
try:
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
content = json.load(f)
return {
"status": "success",
"command": "get_file",
"filename": filename,
"content": content,
"size": os.path.getsize(filename),
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
else:
return {
"status": "error",
"command": "get_file",
"message": f"文件 {filename} 不存在",
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
except Exception as e:
return {
"status": "error",
"command": "get_file",
"message": f"读取文件失败: {str(e)}",
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
def save_file(self, filename, content, address):
"""保存文件"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(content, f, ensure_ascii=False, indent=2)
return {
"status": "success",
"command": "save_file",
"filename": filename,
"message": f"文件 {filename} 保存成功",
"size": os.path.getsize(filename),
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
except Exception as e:
return {
"status": "error",
"command": "save_file",
"message": f"保存文件失败: {str(e)}",
"timestamp": datetime.now().isoformat(),
"client_address": f"{address[0]}:{address[1]}"
}
if __name__ == "__main__":
server = JSONSocketServer()
try:
server.start_server()
except KeyboardInterrupt:
print("\n⛔ 服务器已停止")

64
test.bat Normal file
View File

@ -0,0 +1,64 @@
@echo off
chcp 65001 >nul
echo.
echo ===============================================
echo Socket JSON 传输系统 - Windows 测试工具
echo ===============================================
echo.
REM 检查 Python 是否安装
python --version >nul 2>&1
if errorlevel 1 (
echo ❌ 错误: 未找到 Python请先安装 Python
echo 下载地址: https://www.python.org/downloads/
pause
exit /b 1
)
echo ✅ Python 已安装
python --version
echo.
REM 检查必需文件
echo 📋 检查必需文件...
set "missing_files="
if not exist server.py set "missing_files=%missing_files% server.py"
if not exist client.py set "missing_files=%missing_files% client.py"
if not exist test_data.json set "missing_files=%missing_files% test_data.json"
if not exist test_socket.py set "missing_files=%missing_files% test_socket.py"
if not "%missing_files%"=="" (
echo ❌ 缺少文件:%missing_files%
echo 请确保所有文件都在当前目录中
pause
exit /b 1
)
echo ✅ 所有必需文件都存在
echo.
REM 运行测试
echo 🚀 开始运行自动测试...
echo.
python test_socket.py
echo.
echo ===============================================
echo 测试完成!查看上方输出了解测试结果
echo ===============================================
echo.
REM 显示生成的文件
echo 📁 生成的文件:
if exist client_test.json echo - client_test.json (客户端测试数据)
if exist test_report.json echo - test_report.json (详细测试报告)
if exist received_data.json echo - received_data.json (服务器接收的数据)
echo.
echo 💡 提示:
echo 1. 如果需要手动测试,运行: python server.py
echo 2. 然后在另一个窗口运行: python client.py
echo 3. 查看详细测试报告: notepad test_report.json
echo.
pause

41
test_data.json Normal file
View File

@ -0,0 +1,41 @@
{
"project": "Socket JSON 传输测试",
"version": "1.0.0",
"description": "这是一个用于测试 Socket JSON 传输的示例文件",
"timestamp": "2024-01-01T00:00:00",
"author": {
"name": "开发者",
"email": "developer@example.com"
},
"config": {
"server": {
"host": "localhost",
"port": 8888,
"max_connections": 5
},
"client": {
"timeout": 30,
"retry_count": 3
}
},
"test_data": {
"numbers": [1, 2, 3, 4, 5],
"strings": ["hello", "world", "测试", "数据"],
"boolean": true,
"null_value": null,
"nested_object": {
"level1": {
"level2": {
"message": "这是嵌套的 JSON 对象"
}
}
}
},
"commands": [
"get_file",
"save_file",
"list_files",
"ping"
],
"status": "ready"
}

218
test_socket.py Normal file
View File

@ -0,0 +1,218 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Socket JSON 传输系统自动测试脚本
用于快速验证服务器和客户端功能
"""
import subprocess
import time
import threading
import os
import sys
import json
from datetime import datetime
class SocketTester:
def __init__(self):
self.server_process = None
self.test_results = []
def log(self, message, level="INFO"):
"""记录测试日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {level}: {message}")
self.test_results.append(f"[{timestamp}] {level}: {message}")
def start_server(self):
"""启动服务器"""
try:
self.log("正在启动服务器...")
self.server_process = subprocess.Popen(
[sys.executable, "server.py"],
cwd=os.getcwd(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待服务器启动
time.sleep(2)
if self.server_process.poll() is None:
self.log("✅ 服务器启动成功", "SUCCESS")
return True
else:
stdout, stderr = self.server_process.communicate()
self.log(f"❌ 服务器启动失败: {stderr}", "ERROR")
return False
except Exception as e:
self.log(f"❌ 启动服务器时出错: {e}", "ERROR")
return False
def stop_server(self):
"""停止服务器"""
if self.server_process:
try:
self.server_process.terminate()
self.server_process.wait(timeout=5)
self.log("⛔ 服务器已停止", "INFO")
except subprocess.TimeoutExpired:
self.server_process.kill()
self.log("⛔ 强制停止服务器", "WARNING")
except Exception as e:
self.log(f"❌ 停止服务器时出错: {e}", "ERROR")
def test_client_commands(self):
"""测试客户端命令"""
try:
self.log("开始测试客户端功能...")
# 运行客户端测试
result = subprocess.run(
[sys.executable, "client.py", "test"],
cwd=os.getcwd(),
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
self.log("✅ 客户端测试完成", "SUCCESS")
self.log("客户端输出:", "DEBUG")
for line in result.stdout.split('\n'):
if line.strip():
self.log(f" {line}", "DEBUG")
return True
else:
self.log(f"❌ 客户端测试失败: {result.stderr}", "ERROR")
return False
except subprocess.TimeoutExpired:
self.log("❌ 客户端测试超时", "ERROR")
return False
except Exception as e:
self.log(f"❌ 客户端测试出错: {e}", "ERROR")
return False
def verify_files(self):
"""验证文件是否正确创建"""
self.log("验证测试文件...")
required_files = ["server.py", "client.py", "test_data.json"]
missing_files = []
for file in required_files:
if not os.path.exists(file):
missing_files.append(file)
if missing_files:
self.log(f"❌ 缺少文件: {', '.join(missing_files)}", "ERROR")
return False
else:
self.log("✅ 所有必需文件都存在", "SUCCESS")
return True
def run_full_test(self):
"""运行完整测试"""
self.log("🚀 开始 Socket JSON 传输系统完整测试", "INFO")
self.log("=" * 50, "INFO")
success_count = 0
total_tests = 4
try:
# 测试 1: 验证文件
self.log("\n📋 测试 1/4: 验证文件存在性", "INFO")
if self.verify_files():
success_count += 1
# 测试 2: 启动服务器
self.log("\n🖥️ 测试 2/4: 启动服务器", "INFO")
if self.start_server():
success_count += 1
# 测试 3: 客户端功能测试
self.log("\n💻 测试 3/4: 客户端功能测试", "INFO")
if self.test_client_commands():
success_count += 1
# 测试 4: 文件传输验证
self.log("\n📁 测试 4/4: 验证文件传输", "INFO")
if os.path.exists("client_test.json"):
self.log("✅ 发现客户端创建的测试文件", "SUCCESS")
try:
with open("client_test.json", 'r', encoding='utf-8') as f:
data = json.load(f)
self.log("✅ JSON 文件格式正确", "SUCCESS")
self.log(f"文件内容预览: {data.get('message', 'N/A')}", "DEBUG")
success_count += 1
except Exception as e:
self.log(f"❌ JSON 文件格式错误: {e}", "ERROR")
else:
self.log("❌ 未找到客户端测试文件", "ERROR")
finally:
self.stop_server()
# 测试结果总结
self.log("\n" + "=" * 50, "INFO")
self.log(f"🎯 测试完成: {success_count}/{total_tests} 项通过",
"SUCCESS" if success_count == total_tests else "WARNING")
if success_count == total_tests:
self.log("🎉 所有测试通过Socket JSON 传输系统工作正常", "SUCCESS")
else:
self.log("⚠️ 部分测试失败,请检查上述错误信息", "WARNING")
return success_count == total_tests
def save_test_report(self):
"""保存测试报告"""
report = {
"test_time": datetime.now().isoformat(),
"test_results": self.test_results,
"summary": f"测试完成,详细日志共 {len(self.test_results)}"
}
with open("test_report.json", 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.log("📊 测试报告已保存到 test_report.json", "INFO")
def main():
"""主函数"""
print("🧪 Socket JSON 传输系统自动测试工具")
print("=" * 50)
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == "help":
print("使用方法:")
print(" python test_socket.py - 运行完整测试")
print(" python test_socket.py full - 运行完整测试")
print(" python test_socket.py help - 显示帮助")
return
tester = SocketTester()
try:
success = tester.run_full_test()
tester.save_test_report()
if success:
print("\n🎊 测试全部通过!您的 Socket JSON 传输系统运行正常!")
else:
print("\n🔧 测试发现问题,请查看上述日志进行修复")
except KeyboardInterrupt:
print("\n⛔ 测试被用户中断")
tester.stop_server()
except Exception as e:
print(f"\n❌ 测试过程中出现异常: {e}")
tester.stop_server()
if __name__ == "__main__":
main()

284
使用说明.md Normal file
View File

@ -0,0 +1,284 @@
# Socket JSON 传输系统使用说明
## 📋 系统概述
这是一个基于 Python Socket 的 JSON 文件传输系统,包含完整的服务器端和客户端实现。系统支持:
- ✅ JSON 格式文件的双向传输
- ✅ 多客户端并发连接
- ✅ 完整的命令系统(获取文件、保存文件、列出文件、心跳测试)
- ✅ 实时状态监控和错误处理
- ✅ 自动化测试工具
## 📁 文件结构
```
blender/
├── server.py # 服务器端程序
├── client.py # 客户端程序
├── test_data.json # 测试用 JSON 文件
├── test_socket.py # 自动测试脚本
├── test.bat # Windows 快速测试工具
└── 使用说明.md # 本说明文档
```
## 🚀 快速开始
### 方法一:自动测试(推荐)
在 Windows 系统下:
```bash
# 双击运行
test.bat
# 或在命令行运行
python test_socket.py
```
### 方法二:手动测试
1. **启动服务器**
```bash
python server.py
```
服务器启动后会显示:
```
🚀 服务器启动成功!
📍 监听地址: localhost:8888
⏰ 启动时间: 2024-xx-xx xx:xx:xx
✅ 等待客户端连接...
```
2. **在另一个终端窗口启动客户端**
```bash
# 交互模式
python client.py
# 或运行测试模式
python client.py test
```
## 🎮 客户端交互命令
进入客户端交互模式后,可以使用以下命令:
| 命令 | 说明 | 示例 |
|------|------|------|
| `get <filename>` | 从服务器获取文件 | `get test_data.json` |
| `save <filename>` | 保存文件到服务器 | `save my_data.json` |
| `list` | 列出服务器上的 JSON 文件 | `list` |
| `ping` | 测试服务器连接 | `ping` |
| `msg <message>` | 发送普通文本消息 | `msg Hello Server!` |
| `quit` | 退出客户端 | `quit` |
### 保存文件示例
当使用 `save` 命令时,系统会提示输入 JSON 内容:
```
👉 请输入命令: save my_test.json
请输入要保存的 JSON 内容(输入空行结束):
{
"name": "测试用户",
"age": 25,
"message": "这是测试数据"
}
# 输入空行结束
```
## 🧪 测试方法详解
### 1. 自动化测试
运行 `python test_socket.py` 会执行以下测试:
- ✅ **文件检查**:验证所有必需文件是否存在
- ✅ **服务器启动**:自动启动服务器并检查状态
- ✅ **客户端功能**:测试所有客户端命令
- ✅ **文件传输**:验证 JSON 文件的正确传输
### 2. 手动功能测试
**测试步骤:**
1. **基础连接测试**
```bash
# 终端1启动服务器
python server.py
# 终端2连接客户端
python client.py
```
2. **心跳测试**
```
👉 请输入命令: ping
```
预期响应:
```json
{
"status": "success",
"command": "ping",
"message": "pong",
"server_time": "2024-xx-xxTxx:xx:xx",
"client_address": "127.0.0.1:xxxxx"
}
```
3. **文件列表测试**
```
👉 请输入命令: list
```
预期响应:
```json
{
"status": "success",
"command": "list_files",
"files": ["test_data.json"],
"count": 1
}
```
4. **文件获取测试**
```
👉 请输入命令: get test_data.json
```
预期响应:
```json
{
"status": "success",
"command": "get_file",
"filename": "test_data.json",
"content": { ... },
"size": 1234
}
```
5. **文件保存测试**
```
👉 请输入命令: save new_file.json
请输入要保存的 JSON 内容(输入空行结束):
{"test": "data"}
# 输入空行
```
6. **普通消息测试**
```
👉 请输入命令: msg 你好服务器!
```
### 3. 多客户端并发测试
服务器支持多个客户端同时连接:
```bash
# 终端1服务器
python server.py
# 终端2客户端1
python client.py
# 终端3客户端2
python client.py
# 终端4客户端3
python client.py
```
## 📊 测试结果验证
### 成功指标
1. **服务器端日志**
```
🚀 服务器启动成功!
🔗 客户端连接: ('127.0.0.1', 端口号)
📨 收到来自 ('127.0.0.1', 端口号) 的消息: {...}
```
2. **客户端端日志**
```
✅ 已连接到服务器 localhost:8888
📤 发送请求: {...}
📥 收到响应: {...}
```
3. **生成的文件**
- `client_test.json` - 客户端测试时创建的文件
- `test_report.json` - 自动测试生成的详细报告
### 常见问题排查
1. **连接失败**
```
❌ 连接失败: [Errno 10061] No connection could be made
```
**解决方案**:确保服务器已启动,检查端口 8888 是否被占用
2. **文件不存在**
```json
{
"status": "error",
"message": "文件 xxx.json 不存在"
}
```
**解决方案**:检查文件名是否正确,确保文件在服务器目录中
3. **JSON 格式错误**
```
❌ JSON 格式错误: Expecting ',' delimiter
```
**解决方案**:检查 JSON 语法,使用在线 JSON 验证工具
## 🔧 高级配置
### 修改服务器配置
`server.py` 中修改:
```python
# 修改监听地址和端口
server = JSONSocketServer(host='0.0.0.0', port=9999)
```
### 修改客户端配置
`client.py` 中修改:
```python
# 连接到不同的服务器
client = JSONSocketClient(host='192.168.1.100', port=9999)
```
## 🛡️ 安全注意事项
- 此系统仅用于学习和测试,不适用于生产环境
- 服务器默认只监听 localhost如需远程访问请谨慎配置
- 传输的数据未加密,敏感信息请使用 HTTPS 或其他加密方案
## 📚 扩展功能
系统支持轻松扩展:
1. **添加新命令**:在 `server.py``process_request` 方法中添加新的命令处理
2. **文件验证**:添加文件类型检查、大小限制等
3. **用户认证**:实现简单的用户名密码验证
4. **数据压缩**:对大文件进行压缩传输
5. **传输加密**:使用 SSL/TLS 加密连接
## 🎯 总结
这个 Socket JSON 传输系统提供了:
- 🔧 **完整功能**:文件传输、列表查看、心跳检测
- 🚀 **易于使用**:一键测试,交互式命令行
- 🛠️ **可扩展性**:清晰的代码结构,便于功能扩展
- 📊 **完善测试**:自动化测试工具,详细的测试报告
通过运行 `test.bat`Windows`python test_socket.py` 开始您的测试之旅!