538 lines
19 KiB
Python
538 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
SUW Core - Hardware Manager Module
|
||
拆分自: suw_impl.py (Line 2183-2300, 4244-4273)
|
||
用途: Blender五金管理、硬件创建、几何体加载
|
||
版本: 1.0.0
|
||
作者: SUWood Team
|
||
"""
|
||
|
||
from .geometry_utils import Point3d, Transformation
|
||
from .material_manager import material_manager
|
||
from .memory_manager import memory_manager
|
||
from .data_manager import data_manager, get_data_manager
|
||
import time
|
||
import logging
|
||
import math
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
# 设置日志
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 检查Blender可用性
|
||
try:
|
||
import bpy
|
||
BLENDER_AVAILABLE = True
|
||
except ImportError:
|
||
BLENDER_AVAILABLE = False
|
||
|
||
# 导入依赖模块
|
||
|
||
# ==================== 五金管理器类 ====================
|
||
|
||
|
||
class HardwareManager:
|
||
"""五金管理器 - 负责所有硬件相关操作"""
|
||
|
||
def __init__(self):
|
||
"""
|
||
初始化五金管理器 - 完全独立,不依赖suw_impl
|
||
"""
|
||
# 使用全局数据管理器
|
||
self.data_manager = get_data_manager()
|
||
|
||
# 五金数据存储
|
||
self.hardwares = {} # 按uid存储五金数据
|
||
|
||
# 创建统计
|
||
self.creation_stats = {
|
||
"hardwares_created": 0,
|
||
"files_loaded": 0,
|
||
"simple_hardwares": 0,
|
||
"creation_errors": 0
|
||
}
|
||
|
||
logger.info("✅ 五金管理器初始化完成")
|
||
|
||
# ==================== 原始命令方法 ====================
|
||
|
||
def c08(self, data: Dict[str, Any]):
|
||
"""add_hardware - 添加硬件 - 线程安全版本"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
logger.warning("Blender 不可用,跳过硬件创建")
|
||
return 0
|
||
|
||
uid = data.get("uid")
|
||
logger.info(f"🔧 执行c08命令: 添加硬件, uid={uid}")
|
||
|
||
def create_hardware():
|
||
try:
|
||
# 获取硬件数据集合
|
||
hardwares = self._get_hardwares(data)
|
||
items = data.get("items", [])
|
||
created_count = 0
|
||
|
||
for item in items:
|
||
root = item.get("root")
|
||
file_path = item.get("file")
|
||
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
|
||
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
|
||
|
||
# 根据是否有文件路径选择创建方式
|
||
if file_path:
|
||
hardware = self._load_hardware_file(
|
||
file_path, item, ps, pe)
|
||
if hardware:
|
||
self.creation_stats["files_loaded"] += 1
|
||
else:
|
||
hardware = self._create_simple_hardware(
|
||
ps, pe, item)
|
||
if hardware:
|
||
self.creation_stats["simple_hardwares"] += 1
|
||
|
||
if hardware:
|
||
# 设置硬件属性
|
||
hardware["sw_uid"] = uid
|
||
hardware["sw_root"] = root
|
||
hardware["sw_typ"] = "hw"
|
||
|
||
# 存储硬件
|
||
hardwares[root] = hardware
|
||
memory_manager.register_object(hardware)
|
||
created_count += 1
|
||
|
||
self.creation_stats["hardwares_created"] += created_count
|
||
return created_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建硬件失败: {e}")
|
||
self.creation_stats["creation_errors"] += 1
|
||
return 0
|
||
|
||
# 直接执行硬件创建
|
||
count = create_hardware()
|
||
|
||
if count > 0:
|
||
logger.info(f"✅ 成功创建硬件: uid={uid}, count={count}")
|
||
else:
|
||
logger.error(f"❌ 硬件创建失败: uid={uid}")
|
||
|
||
return count
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 添加硬件失败: {e}")
|
||
self.creation_stats["creation_errors"] += 1
|
||
return 0
|
||
|
||
# ==================== 核心创建方法 ====================
|
||
|
||
def _load_hardware_file(self, file_path, item, ps, pe):
|
||
"""加载硬件文件"""
|
||
try:
|
||
logger.info(f"📁 加载硬件文件: {file_path}")
|
||
|
||
if not BLENDER_AVAILABLE:
|
||
return None
|
||
|
||
# 在实际应用中需要实现文件加载逻辑
|
||
# 这里创建占位符对象
|
||
hardware_name = f"Hardware_{item.get('root', 'unknown')}"
|
||
elem = bpy.data.objects.new(hardware_name, None)
|
||
bpy.context.scene.collection.objects.link(elem)
|
||
|
||
# 设置缩放 - 根据ps和pe计算
|
||
if ps and pe:
|
||
distance = math.sqrt((pe.x - ps.x)**2 +
|
||
(pe.y - ps.y)**2 + (pe.z - ps.z)**2)
|
||
if distance > 0:
|
||
elem.scale = (distance, 1.0, 1.0)
|
||
|
||
# 设置位置为中点
|
||
elem.location = (
|
||
(ps.x + pe.x) / 2,
|
||
(ps.y + pe.y) / 2,
|
||
(ps.z + pe.z) / 2
|
||
)
|
||
|
||
# 应用变换
|
||
if "trans" in item:
|
||
trans = Transformation.parse(item["trans"])
|
||
self._apply_transformation(elem, trans)
|
||
|
||
# 设置硬件属性
|
||
elem["sw_file_path"] = file_path
|
||
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
|
||
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
|
||
|
||
# 应用硬件材质
|
||
self._apply_hardware_material(elem, item)
|
||
|
||
logger.info(f"✅ 硬件文件加载成功: {hardware_name}")
|
||
return elem
|
||
|
||
except Exception as e:
|
||
logger.error(f"加载硬件文件失败: {e}")
|
||
return None
|
||
|
||
def _create_simple_hardware(self, ps, pe, item):
|
||
"""创建简单硬件几何体"""
|
||
try:
|
||
logger.info(f"🔧 创建简单硬件: ps={ps}, pe={pe}")
|
||
|
||
if not BLENDER_AVAILABLE:
|
||
return None
|
||
|
||
hardware_name = f"Simple_Hardware_{item.get('root', 'unknown')}"
|
||
elem = bpy.data.objects.new(hardware_name, None)
|
||
bpy.context.scene.collection.objects.link(elem)
|
||
|
||
# 创建路径
|
||
if ps and pe:
|
||
path = self._create_line_path(ps, pe)
|
||
elem["sw_path"] = str(path)
|
||
|
||
# 创建截面
|
||
sect = item.get("sect", {})
|
||
color = item.get("ckey")
|
||
|
||
# 使用follow_me创建几何体
|
||
if sect:
|
||
self._follow_me(
|
||
elem, sect, path if 'path' in locals() else None, color)
|
||
|
||
# 设置硬件属性
|
||
elem["sw_ckey"] = color
|
||
elem["sw_sect"] = str(sect)
|
||
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
|
||
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
|
||
|
||
# 应用硬件材质
|
||
self._apply_hardware_material(elem, item)
|
||
|
||
logger.info(f"✅ 简单硬件创建成功: {hardware_name}")
|
||
return elem
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建简单硬件失败: {e}")
|
||
return None
|
||
|
||
# ==================== 硬件纹理处理方法 ====================
|
||
|
||
def _textured_hw(self, hw, selected):
|
||
"""为硬件应用纹理 - 从选择管理器迁移"""
|
||
try:
|
||
if not hw:
|
||
return
|
||
|
||
# 设置硬件的选择材质
|
||
color = "mat_select" if selected else "mat_hardware"
|
||
texture = material_manager.get_texture(color)
|
||
|
||
if texture and hasattr(hw, 'data') and hw.data:
|
||
if not hw.data.materials:
|
||
hw.data.materials.append(texture)
|
||
else:
|
||
hw.data.materials[0] = texture
|
||
|
||
# 设置硬件可见性
|
||
if hasattr(hw, 'hide_viewport'):
|
||
hw.hide_viewport = False # 硬件通常总是可见
|
||
|
||
except Exception as e:
|
||
logger.error(f"为硬件应用纹理失败: {e}")
|
||
|
||
def _apply_hardware_material(self, hardware, item):
|
||
"""应用硬件材质"""
|
||
try:
|
||
# 获取硬件材质
|
||
color_key = item.get("ckey", "mat_hardware")
|
||
material = material_manager.get_texture(color_key)
|
||
|
||
if material and hasattr(hardware, 'data') and hardware.data:
|
||
# 如果硬件有网格数据,应用材质
|
||
if not hardware.data.materials:
|
||
hardware.data.materials.append(material)
|
||
else:
|
||
hardware.data.materials[0] = material
|
||
else:
|
||
# 如果硬件没有网格数据,设置自定义属性
|
||
hardware["sw_material"] = color_key
|
||
|
||
except Exception as e:
|
||
logger.error(f"应用硬件材质失败: {e}")
|
||
|
||
# ==================== 几何体创建辅助方法 ====================
|
||
|
||
def _create_line_path(self, ps, pe):
|
||
"""创建线性路径"""
|
||
try:
|
||
if not ps or not pe:
|
||
return None
|
||
|
||
# 创建简单的线性路径
|
||
path_data = {
|
||
"type": "line",
|
||
"start": [ps.x, ps.y, ps.z],
|
||
"end": [pe.x, pe.y, pe.z],
|
||
"length": math.sqrt((pe.x - ps.x)**2 + (pe.y - ps.y)**2 + (pe.z - ps.z)**2)
|
||
}
|
||
|
||
return path_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建线性路径失败: {e}")
|
||
return None
|
||
|
||
def _follow_me(self, container, surface, path, color):
|
||
"""Follow me操作 - 沿路径挤出截面"""
|
||
try:
|
||
if not BLENDER_AVAILABLE or not container:
|
||
return
|
||
|
||
# 这是一个简化的follow_me实现
|
||
# 在实际应用中需要根据具体的截面和路径数据实现
|
||
|
||
# 创建基本几何体作为占位符
|
||
if not container.data:
|
||
mesh = bpy.data.meshes.new(f"{container.name}_mesh")
|
||
|
||
# 创建简单的立方体作为占位符
|
||
vertices = [
|
||
(0, 0, 0), (1, 0, 0), (1, 1, 0), (0, 1, 0),
|
||
(0, 0, 1), (1, 0, 1), (1, 1, 1), (0, 1, 1)
|
||
]
|
||
edges = []
|
||
faces = [
|
||
(0, 1, 2, 3), (4, 7, 6, 5), (0, 4, 5, 1),
|
||
(1, 5, 6, 2), (2, 6, 7, 3), (3, 7, 4, 0)
|
||
]
|
||
|
||
mesh.from_pydata(vertices, edges, faces)
|
||
mesh.update()
|
||
container.data = mesh
|
||
|
||
logger.debug(f"Follow me操作完成: {container.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Follow me操作失败: {e}")
|
||
|
||
def _apply_transformation(self, obj, transformation):
|
||
"""应用变换到对象"""
|
||
try:
|
||
if not BLENDER_AVAILABLE or not obj or not transformation:
|
||
return
|
||
|
||
# 应用位置变换
|
||
if hasattr(transformation, 'origin'):
|
||
obj.location = (
|
||
transformation.origin.x,
|
||
transformation.origin.y,
|
||
transformation.origin.z
|
||
)
|
||
|
||
# 应用旋转变换(简化实现)
|
||
if hasattr(transformation, 'x_axis') and hasattr(transformation, 'y_axis'):
|
||
# 这里应该根据轴向量计算旋转,简化为默认旋转
|
||
pass
|
||
|
||
logger.debug(f"变换应用完成: {obj.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"应用变换失败: {e}")
|
||
|
||
# ==================== 辅助方法 ====================
|
||
|
||
def _get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""获取硬件数据 - 使用data_manager"""
|
||
return self.data_manager.get_hardwares(data)
|
||
|
||
def _is_object_valid(self, obj) -> bool:
|
||
"""检查对象是否有效"""
|
||
try:
|
||
if not obj or not BLENDER_AVAILABLE:
|
||
return False
|
||
return obj.name in bpy.data.objects
|
||
except:
|
||
return False
|
||
|
||
def _delete_object_safe(self, obj) -> bool:
|
||
"""安全删除对象"""
|
||
try:
|
||
if not obj or not BLENDER_AVAILABLE:
|
||
return False
|
||
|
||
if obj.name in bpy.data.objects:
|
||
bpy.data.objects.remove(obj, do_unlink=True)
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"删除硬件对象失败: {e}")
|
||
return False
|
||
|
||
# ==================== 硬件管理方法 ====================
|
||
|
||
def create_hardware_batch(self, data: Dict[str, Any]) -> int:
|
||
"""批量创建硬件"""
|
||
try:
|
||
items = data.get("items", [])
|
||
if not items:
|
||
return 0
|
||
|
||
logger.info(f"🔧 开始批量创建硬件: {len(items)} 个")
|
||
|
||
created_count = 0
|
||
for item in items:
|
||
try:
|
||
# 解析参数
|
||
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
|
||
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
|
||
file_path = item.get("file")
|
||
|
||
# 创建硬件
|
||
if file_path:
|
||
hardware = self._load_hardware_file(
|
||
file_path, item, ps, pe)
|
||
else:
|
||
hardware = self._create_simple_hardware(ps, pe, item)
|
||
|
||
if hardware:
|
||
created_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建单个硬件失败: {e}")
|
||
|
||
self.creation_stats["hardwares_created"] += created_count
|
||
logger.info(f"✅ 批量硬件创建完成: {created_count}/{len(items)} 成功")
|
||
|
||
return created_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量创建硬件失败: {e}")
|
||
self.creation_stats["creation_errors"] += 1
|
||
return 0
|
||
|
||
def delete_hardware(self, uid: str, hw_id: int) -> bool:
|
||
"""删除单个硬件"""
|
||
try:
|
||
logger.info(f"🗑️ 删除硬件: uid={uid}, hw_id={hw_id}")
|
||
|
||
# 从本地存储中查找
|
||
if uid in self.hardwares and hw_id in self.hardwares[uid]:
|
||
hw_obj = self.hardwares[uid][hw_id]
|
||
if hw_obj and self._is_object_valid(hw_obj):
|
||
success = self._delete_object_safe(hw_obj)
|
||
if success:
|
||
del self.hardwares[uid][hw_id]
|
||
logger.info(f"✅ 硬件删除成功: uid={uid}, hw_id={hw_id}")
|
||
return True
|
||
|
||
logger.warning(f"硬件不存在或删除失败: uid={uid}, hw_id={hw_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除硬件失败: {e}")
|
||
return False
|
||
|
||
def delete_hardware_batch(self, uid: str, hw_ids: List[int]) -> int:
|
||
"""批量删除硬件"""
|
||
try:
|
||
deleted_count = 0
|
||
for hw_id in hw_ids:
|
||
if self.delete_hardware(uid, hw_id):
|
||
deleted_count += 1
|
||
|
||
logger.info(f"✅ 批量删除硬件完成: {deleted_count}/{len(hw_ids)} 成功")
|
||
return deleted_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量删除硬件失败: {e}")
|
||
return 0
|
||
|
||
# ==================== 统计和管理方法 ====================
|
||
|
||
def get_hardware_stats(self) -> Dict[str, Any]:
|
||
"""获取硬件统计信息"""
|
||
try:
|
||
total_hardwares = sum(len(hw_dict)
|
||
for hw_dict in self.hardwares.values())
|
||
|
||
stats = {
|
||
"total_units": len(self.hardwares),
|
||
"total_hardwares": total_hardwares,
|
||
"creation_stats": self.creation_stats.copy(),
|
||
"hardware_types": {
|
||
"file_based": self.creation_stats["files_loaded"],
|
||
"simple_geometry": self.creation_stats["simple_hardwares"]
|
||
}
|
||
}
|
||
|
||
if BLENDER_AVAILABLE:
|
||
stats["blender_objects"] = len([obj for obj in bpy.data.objects
|
||
if obj.get("sw_typ") == "hw"])
|
||
|
||
return stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取硬件统计失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def get_creation_stats(self) -> Dict[str, Any]:
|
||
"""获取创建统计信息"""
|
||
return self.creation_stats.copy()
|
||
|
||
def reset_creation_stats(self):
|
||
"""重置创建统计"""
|
||
self.creation_stats = {
|
||
"hardwares_created": 0,
|
||
"files_loaded": 0,
|
||
"simple_hardwares": 0,
|
||
"creation_errors": 0
|
||
}
|
||
logger.info("硬件统计已重置")
|
||
|
||
def cleanup(self):
|
||
"""清理硬件管理器"""
|
||
try:
|
||
# 删除所有硬件对象
|
||
total_deleted = 0
|
||
for uid, hw_dict in self.hardwares.items():
|
||
for hw_id, hw_obj in hw_dict.items():
|
||
if self._delete_object_safe(hw_obj):
|
||
total_deleted += 1
|
||
|
||
self.hardwares.clear()
|
||
self.reset_creation_stats()
|
||
|
||
logger.info(f"✅ 硬件管理器清理完成,删除了 {total_deleted} 个对象")
|
||
|
||
except Exception as e:
|
||
logger.error(f"清理硬件管理器失败: {e}")
|
||
|
||
def get_hardware_by_uid(self, uid: str) -> Dict[str, Any]:
|
||
"""根据UID获取硬件"""
|
||
return self.hardwares.get(uid, {})
|
||
|
||
def get_all_hardwares(self) -> Dict[str, Dict[str, Any]]:
|
||
"""获取所有硬件"""
|
||
return self.hardwares.copy()
|
||
|
||
|
||
# ==================== 全局硬件管理器实例 ====================
|
||
|
||
# 全局实例
|
||
hardware_manager = HardwareManager()
|
||
|
||
|
||
def init_hardware_manager():
|
||
"""初始化全局硬件管理器实例 - 不再需要suw_impl参数"""
|
||
global hardware_manager
|
||
hardware_manager = HardwareManager()
|
||
return hardware_manager
|
||
|
||
|
||
def get_hardware_manager():
|
||
"""获取全局硬件管理器实例"""
|
||
return hardware_manager
|