irg/Isometquick-server/test_client.py

342 lines
11 KiB
Python
Raw Normal View History

2025-07-18 17:26:21 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API测试客户端
用于测试Isometric Room Generator渲染服务
"""
import requests
import time
import json
import sys
class IRGClient:
"""Isometric Room Generator API客户端"""
def __init__(self, base_url: str = "http://localhost:8003"):
self.base_url = base_url.rstrip('/')
def health_check(self):
"""健康检查"""
try:
response = requests.get(f"{self.base_url}/health", timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
def create_render_task(self, request_data: dict):
"""创建渲染任务"""
try:
response = requests.post(
f"{self.base_url}/render",
json=request_data,
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
def get_task_status(self, task_id: str):
"""获取任务状态"""
try:
response = requests.get(
f"{self.base_url}/render/{task_id}", timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
def download_result(self, task_id: str, output_file: str):
"""下载渲染结果"""
try:
response = requests.get(
f"{self.base_url}/render/{task_id}/download",
timeout=60
)
response.raise_for_status()
with open(output_file, 'wb') as f:
f.write(response.content)
return {"success": True, "file": output_file}
except Exception as e:
return {"error": str(e)}
def wait_for_completion(self, task_id: str, max_wait: int = 300):
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < max_wait:
status_result = self.get_task_status(task_id)
if "error" in status_result:
return status_result
status = status_result.get("status")
print(f"任务状态: {status}")
if status == "completed":
return status_result
elif status == "failed":
return status_result
time.sleep(2)
return {"error": "任务超时"}
def test_basic_render():
"""测试基本渲染功能"""
print("=" * 60)
print("测试基本渲染功能")
print("=" * 60)
client = IRGClient()
# 1. 健康检查
print("1. 健康检查...")
health = client.health_check()
print(f"健康状态: {json.dumps(health, indent=2, ensure_ascii=False)}")
if not health.get("blender_available"):
print("❌ Blender不可用请检查安装")
return
# 2. 创建渲染任务
print("\n2. 创建渲染任务...")
request_data = {
"room": {
"length": 7.0,
"width": 4.0,
"height": 3.0,
2025-07-18 20:47:36 +08:00
"prop_type": 0 # 0=无, 1=落地窗, 2=拱门, 3=门
2025-07-18 17:26:21 +08:00
},
"camera": {
"height": 1.3,
"view_type": 2, #正视图1 侧视图2
"rotation_angle": 45.0
},
"render": {
"resolution_x": 1080,
"resolution_y": 2400,
"engine": "eevee" # 固定为EEVEE
}
}
create_result = client.create_render_task(request_data)
print(f"创建结果: {json.dumps(create_result, indent=2, ensure_ascii=False)}")
if "error" in create_result:
print("❌ 任务创建失败")
return
task_id = create_result.get("task_id")
if not task_id:
print("❌ 未获取到任务ID")
return
# 3. 等待任务完成
print(f"\n3. 等待任务完成 (ID: {task_id})...")
completion_result = client.wait_for_completion(task_id)
if "error" in completion_result:
print(f"❌ 任务失败: {completion_result['error']}")
return
if completion_result.get("status") == "completed":
print("✅ 任务完成!")
# 4. 下载结果
print("\n4. 下载渲染结果...")
output_file = f"test_render_{task_id}.png"
download_result = client.download_result(task_id, output_file)
if "error" in download_result:
print(f"❌ 下载失败: {download_result['error']}")
else:
print(f"✅ 渲染结果已保存到: {output_file}")
else:
print(f"❌ 任务失败: {completion_result.get('error')}")
def test_different_views():
"""测试不同视图"""
print("=" * 60)
print("测试不同视图")
print("=" * 60)
client = IRGClient()
# 测试正视图和侧视图
test_cases = [
{
"name": "正视图",
"data": {
"room": {"length": 4.0, "width": 4.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 1, "rotation_angle": 0.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "侧视图-45度",
"data": {
"room": {"length": 4.0, "width": 4.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "侧视图-30度",
"data": {
"room": {"length": 4.0, "width": 4.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 30.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{i}. 测试{test_case['name']}...")
create_result = client.create_render_task(test_case['data'])
if "error" in create_result:
print(f"❌ 创建失败: {create_result['error']}")
continue
task_id = create_result.get("task_id")
print(f"任务ID: {task_id}")
# 快速检查(不等待完成)
time.sleep(1)
status = client.get_task_status(task_id)
print(f"当前状态: {status.get('status', 'unknown')}")
def test_prop_types():
"""测试不同道具类型"""
print("=" * 60)
print("测试不同道具类型")
print("=" * 60)
client = IRGClient()
# 测试不同道具类型
test_cases = [
{
"name": "无道具",
"data": {
"room": {"length": 5.0, "width": 4.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "落地窗",
"data": {
"room": {"length": 5.0, "width": 4.0, "height": 3.0, "prop_type": 1},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "拱门",
"data": {
"room": {"length": 5.0, "width": 4.0, "height": 3.0, "prop_type": 2},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "",
"data": {
"room": {"length": 5.0, "width": 4.0, "height": 3.0, "prop_type": 3},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{i}. 测试{test_case['name']}...")
create_result = client.create_render_task(test_case['data'])
if "error" in create_result:
print(f"❌ 创建失败: {create_result['error']}")
continue
task_id = create_result.get("task_id")
print(f"任务ID: {task_id}")
# 快速检查(不等待完成)
time.sleep(1)
status = client.get_task_status(task_id)
print(f"当前状态: {status.get('status', 'unknown')}")
def test_room_dimensions():
"""测试房间尺寸调整逻辑"""
print("=" * 60)
print("测试房间尺寸调整逻辑")
print("=" * 60)
client = IRGClient()
# 测试房间尺寸调整width > length 的情况)
test_cases = [
{
"name": "正常尺寸 (length >= width)",
"data": {
"room": {"length": 6.0, "width": 4.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
},
{
"name": "需要调整 (width > length)",
"data": {
"room": {"length": 4.0, "width": 6.0, "height": 3.0, "prop_type": 0},
"camera": {"height": 1.3, "view_type": 2, "rotation_angle": 45.0},
"render": {"resolution_x": 800, "resolution_y": 600, "engine": "eevee"}
}
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{i}. 测试{test_case['name']}...")
create_result = client.create_render_task(test_case['data'])
if "error" in create_result:
print(f"❌ 创建失败: {create_result['error']}")
continue
task_id = create_result.get("task_id")
print(f"任务ID: {task_id}")
# 快速检查(不等待完成)
time.sleep(1)
status = client.get_task_status(task_id)
print(f"当前状态: {status.get('status', 'unknown')}")
def main():
"""主函数"""
if len(sys.argv) > 1:
if sys.argv[1] == "views":
test_different_views()
elif sys.argv[1] == "props":
test_prop_types()
elif sys.argv[1] == "dimensions":
test_room_dimensions()
else:
print("用法: python test_client.py [views|props|dimensions]")
print(" views - 测试不同视图")
print(" props - 测试不同道具类型")
print(" dimensions - 测试房间尺寸调整")
else:
test_basic_render()
if __name__ == "__main__":
main()