irg/Isometquick-server/main.py

235 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Isometquick Blender渲染微服务
基于FastAPI的Blender等轴测房间渲染API服务
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel, Field
from typing import Optional, Literal
import os
import asyncio
import uuid
from datetime import datetime
import logging
from blender_service import BlenderRenderService
from models import RenderRequest, RenderResponse, RenderStatus
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="Isometquick Blender渲染服务",
description="基于Blender的等轴测房间渲染API微服务",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# 创建Blender服务实例
blender_service = BlenderRenderService()
# 渲染任务状态存储生产环境应使用Redis等
render_tasks = {}
@app.get("/")
async def root():
"""健康检查接口"""
return {
"service": "Isometquick Blender渲染服务",
"status": "running",
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
@app.get("/health")
async def health_check():
"""服务健康检查"""
try:
# 检查Blender是否可用
blender_available = blender_service.check_blender_available()
return {
"status": "healthy" if blender_available else "unhealthy",
"blender_available": blender_available,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"健康检查失败: {e}")
return JSONResponse(
status_code=500,
content={"status": "unhealthy", "error": str(e)}
)
@app.post("/render", response_model=RenderResponse)
async def create_render_task(
request: RenderRequest,
background_tasks: BackgroundTasks
):
"""
创建渲染任务
"""
try:
# 生成任务ID
task_id = str(uuid.uuid4())
# 记录任务状态
render_tasks[task_id] = {
"status": "pending",
"created_at": datetime.now().isoformat(),
"request": request.dict(),
"output_file": None,
"error": None
}
# 添加后台渲染任务
background_tasks.add_task(
execute_render_task,
task_id,
request
)
logger.info(f"创建渲染任务: {task_id}")
return RenderResponse(
task_id=task_id,
status="pending",
message="渲染任务已创建,正在处理中"
)
except Exception as e:
logger.error(f"创建渲染任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/render/{task_id}", response_model=RenderStatus)
async def get_render_status(task_id: str):
"""
获取渲染任务状态
"""
if task_id not in render_tasks:
raise HTTPException(status_code=404, detail="任务不存在")
task_info = render_tasks[task_id]
return RenderStatus(
task_id=task_id,
status=task_info["status"],
created_at=task_info["created_at"],
output_file=task_info.get("output_file"),
error=task_info.get("error")
)
@app.get("/render/{task_id}/download")
async def download_render_result(task_id: str):
"""
下载渲染结果
"""
if task_id not in render_tasks:
raise HTTPException(status_code=404, detail="任务不存在")
task_info = render_tasks[task_id]
if task_info["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"任务状态: {task_info['status']}, 无法下载"
)
output_file = task_info.get("output_file")
if not output_file or not os.path.exists(output_file):
raise HTTPException(status_code=404, detail="渲染文件不存在")
return FileResponse(
path=output_file,
filename=f"render_{task_id}.png",
media_type="image/png"
)
@app.delete("/render/{task_id}")
async def delete_render_task(task_id: str):
"""
删除渲染任务和相关文件
"""
if task_id not in render_tasks:
raise HTTPException(status_code=404, detail="任务不存在")
task_info = render_tasks[task_id]
# 删除输出文件
output_file = task_info.get("output_file")
if output_file and os.path.exists(output_file):
try:
os.remove(output_file)
logger.info(f"删除文件: {output_file}")
except Exception as e:
logger.warning(f"删除文件失败: {e}")
# 删除任务记录
del render_tasks[task_id]
return {"message": f"任务 {task_id} 已删除"}
@app.get("/tasks")
async def list_render_tasks():
"""
列出所有渲染任务
"""
return {
"total": len(render_tasks),
"tasks": [
{
"task_id": task_id,
"status": task_info["status"],
"created_at": task_info["created_at"]
}
for task_id, task_info in render_tasks.items()
]
}
async def execute_render_task(task_id: str, request: RenderRequest):
"""
执行渲染任务(后台任务)
"""
try:
# 更新任务状态
render_tasks[task_id]["status"] = "processing"
logger.info(f"开始处理渲染任务: {task_id}")
# 调用Blender服务执行渲染
output_file = await blender_service.render_room(task_id, request)
# 更新任务状态
render_tasks[task_id].update({
"status": "completed",
"output_file": output_file,
"completed_at": datetime.now().isoformat()
})
logger.info(f"渲染任务完成: {task_id}, 输出文件: {output_file}")
except Exception as e:
# 更新错误状态
render_tasks[task_id].update({
"status": "failed",
"error": str(e),
"failed_at": datetime.now().isoformat()
})
logger.error(f"渲染任务失败: {task_id}, 错误: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)