suwoodblender/test_socket.py

224 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Socket JSON 传输系统自动测试脚本
用于快速验证服务器和客户端功能
"""
import subprocess
import time
import threading
import os
import sys
import json
from datetime import datetime
# 设置标准输出编码为 UTF-8解决 Windows 控制台编码问题)
if sys.platform.startswith('win'):
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.detach())
class SocketTester:
def __init__(self):
self.server_process = None
self.test_results = []
def log(self, message, level="INFO"):
"""记录测试日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {level}: {message}")
self.test_results.append(f"[{timestamp}] {level}: {message}")
def start_server(self):
"""启动服务器"""
try:
self.log("正在启动服务器...")
self.server_process = subprocess.Popen(
[sys.executable, "server.py"],
cwd=os.getcwd(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待服务器启动
time.sleep(2)
if self.server_process.poll() is None:
self.log("✅ 服务器启动成功", "SUCCESS")
return True
else:
stdout, stderr = self.server_process.communicate()
self.log(f"❌ 服务器启动失败: {stderr}", "ERROR")
return False
except Exception as e:
self.log(f"❌ 启动服务器时出错: {e}", "ERROR")
return False
def stop_server(self):
"""停止服务器"""
if self.server_process:
try:
self.server_process.terminate()
self.server_process.wait(timeout=5)
self.log("⛔ 服务器已停止", "INFO")
except subprocess.TimeoutExpired:
self.server_process.kill()
self.log("⛔ 强制停止服务器", "WARNING")
except Exception as e:
self.log(f"❌ 停止服务器时出错: {e}", "ERROR")
def test_client_commands(self):
"""测试客户端命令"""
try:
self.log("开始测试客户端功能...")
# 运行客户端测试
result = subprocess.run(
[sys.executable, "client.py", "test"],
cwd=os.getcwd(),
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
self.log("✅ 客户端测试完成", "SUCCESS")
self.log("客户端输出:", "DEBUG")
for line in result.stdout.split('\n'):
if line.strip():
self.log(f" {line}", "DEBUG")
return True
else:
self.log(f"❌ 客户端测试失败: {result.stderr}", "ERROR")
return False
except subprocess.TimeoutExpired:
self.log("❌ 客户端测试超时", "ERROR")
return False
except Exception as e:
self.log(f"❌ 客户端测试出错: {e}", "ERROR")
return False
def verify_files(self):
"""验证文件是否正确创建"""
self.log("验证测试文件...")
required_files = ["server.py", "client.py", "test_data.json"]
missing_files = []
for file in required_files:
if not os.path.exists(file):
missing_files.append(file)
if missing_files:
self.log(f"❌ 缺少文件: {', '.join(missing_files)}", "ERROR")
return False
else:
self.log("✅ 所有必需文件都存在", "SUCCESS")
return True
def run_full_test(self):
"""运行完整测试"""
self.log("🚀 开始 Socket JSON 传输系统完整测试", "INFO")
self.log("=" * 50, "INFO")
success_count = 0
total_tests = 4
try:
# 测试 1: 验证文件
self.log("\n📋 测试 1/4: 验证文件存在性", "INFO")
if self.verify_files():
success_count += 1
# 测试 2: 启动服务器
self.log("\n🖥️ 测试 2/4: 启动服务器", "INFO")
if self.start_server():
success_count += 1
# 测试 3: 客户端功能测试
self.log("\n💻 测试 3/4: 客户端功能测试", "INFO")
if self.test_client_commands():
success_count += 1
# 测试 4: 文件传输验证
self.log("\n📁 测试 4/4: 验证文件传输", "INFO")
if os.path.exists("client_test.json"):
self.log("✅ 发现客户端创建的测试文件", "SUCCESS")
try:
with open("client_test.json", 'r', encoding='utf-8') as f:
data = json.load(f)
self.log("✅ JSON 文件格式正确", "SUCCESS")
self.log(f"文件内容预览: {data.get('message', 'N/A')}", "DEBUG")
success_count += 1
except Exception as e:
self.log(f"❌ JSON 文件格式错误: {e}", "ERROR")
else:
self.log("❌ 未找到客户端测试文件", "ERROR")
finally:
self.stop_server()
# 测试结果总结
self.log("\n" + "=" * 50, "INFO")
self.log(f"🎯 测试完成: {success_count}/{total_tests} 项通过",
"SUCCESS" if success_count == total_tests else "WARNING")
if success_count == total_tests:
self.log("🎉 所有测试通过Socket JSON 传输系统工作正常", "SUCCESS")
else:
self.log("⚠️ 部分测试失败,请检查上述错误信息", "WARNING")
return success_count == total_tests
def save_test_report(self):
"""保存测试报告"""
report = {
"test_time": datetime.now().isoformat(),
"test_results": self.test_results,
"summary": f"测试完成,详细日志共 {len(self.test_results)}"
}
with open("test_report.json", 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.log("📊 测试报告已保存到 test_report.json", "INFO")
def main():
"""主函数"""
print("🧪 Socket JSON 传输系统自动测试工具")
print("=" * 50)
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == "help":
print("使用方法:")
print(" python test_socket.py - 运行完整测试")
print(" python test_socket.py full - 运行完整测试")
print(" python test_socket.py help - 显示帮助")
return
tester = SocketTester()
try:
success = tester.run_full_test()
tester.save_test_report()
if success:
print("\n🎊 测试全部通过!您的 Socket JSON 传输系统运行正常!")
else:
print("\n🔧 测试发现问题,请查看上述日志进行修复")
except KeyboardInterrupt:
print("\n⛔ 测试被用户中断")
tester.stop_server()
except Exception as e:
print(f"\n❌ 测试过程中出现异常: {e}")
tester.stop_server()
if __name__ == "__main__":
main()