blenderpython/suw_core/hardware_manager.py

538 lines
19 KiB
Python
Raw Permalink Normal View History

2025-08-01 17:13:30 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Core - Hardware Manager Module
拆分自: suw_impl.py (Line 2183-2300, 4244-4273)
用途: Blender五金管理硬件创建几何体加载
版本: 1.0.0
作者: SUWood Team
"""
from .geometry_utils import Point3d, Transformation
from .material_manager import material_manager
from .memory_manager import memory_manager
from .data_manager import data_manager, get_data_manager
import time
import logging
import math
from typing import Dict, Any, List, Optional
# 设置日志
logger = logging.getLogger(__name__)
# 检查Blender可用性
try:
import bpy
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
# 导入依赖模块
# ==================== 五金管理器类 ====================
class HardwareManager:
"""五金管理器 - 负责所有硬件相关操作"""
def __init__(self):
"""
初始化五金管理器 - 完全独立不依赖suw_impl
"""
# 使用全局数据管理器
self.data_manager = get_data_manager()
# 五金数据存储
self.hardwares = {} # 按uid存储五金数据
# 创建统计
self.creation_stats = {
"hardwares_created": 0,
"files_loaded": 0,
"simple_hardwares": 0,
"creation_errors": 0
}
logger.info("✅ 五金管理器初始化完成")
# ==================== 原始命令方法 ====================
def c08(self, data: Dict[str, Any]):
"""add_hardware - 添加硬件 - 线程安全版本"""
try:
if not BLENDER_AVAILABLE:
logger.warning("Blender 不可用,跳过硬件创建")
return 0
uid = data.get("uid")
logger.info(f"🔧 执行c08命令: 添加硬件, uid={uid}")
def create_hardware():
try:
# 获取硬件数据集合
hardwares = self._get_hardwares(data)
items = data.get("items", [])
created_count = 0
for item in items:
root = item.get("root")
file_path = item.get("file")
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
# 根据是否有文件路径选择创建方式
if file_path:
hardware = self._load_hardware_file(
file_path, item, ps, pe)
if hardware:
self.creation_stats["files_loaded"] += 1
else:
hardware = self._create_simple_hardware(
ps, pe, item)
if hardware:
self.creation_stats["simple_hardwares"] += 1
if hardware:
# 设置硬件属性
hardware["sw_uid"] = uid
hardware["sw_root"] = root
hardware["sw_typ"] = "hw"
# 存储硬件
hardwares[root] = hardware
memory_manager.register_object(hardware)
created_count += 1
self.creation_stats["hardwares_created"] += created_count
return created_count
except Exception as e:
logger.error(f"创建硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
# 直接执行硬件创建
count = create_hardware()
if count > 0:
logger.info(f"✅ 成功创建硬件: uid={uid}, count={count}")
else:
logger.error(f"❌ 硬件创建失败: uid={uid}")
return count
except Exception as e:
logger.error(f"❌ 添加硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
# ==================== 核心创建方法 ====================
def _load_hardware_file(self, file_path, item, ps, pe):
"""加载硬件文件"""
try:
logger.info(f"📁 加载硬件文件: {file_path}")
if not BLENDER_AVAILABLE:
return None
# 在实际应用中需要实现文件加载逻辑
# 这里创建占位符对象
hardware_name = f"Hardware_{item.get('root', 'unknown')}"
elem = bpy.data.objects.new(hardware_name, None)
bpy.context.scene.collection.objects.link(elem)
# 设置缩放 - 根据ps和pe计算
if ps and pe:
distance = math.sqrt((pe.x - ps.x)**2 +
(pe.y - ps.y)**2 + (pe.z - ps.z)**2)
if distance > 0:
elem.scale = (distance, 1.0, 1.0)
# 设置位置为中点
elem.location = (
(ps.x + pe.x) / 2,
(ps.y + pe.y) / 2,
(ps.z + pe.z) / 2
)
# 应用变换
if "trans" in item:
trans = Transformation.parse(item["trans"])
self._apply_transformation(elem, trans)
# 设置硬件属性
elem["sw_file_path"] = file_path
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
# 应用硬件材质
self._apply_hardware_material(elem, item)
logger.info(f"✅ 硬件文件加载成功: {hardware_name}")
return elem
except Exception as e:
logger.error(f"加载硬件文件失败: {e}")
return None
def _create_simple_hardware(self, ps, pe, item):
"""创建简单硬件几何体"""
try:
logger.info(f"🔧 创建简单硬件: ps={ps}, pe={pe}")
if not BLENDER_AVAILABLE:
return None
hardware_name = f"Simple_Hardware_{item.get('root', 'unknown')}"
elem = bpy.data.objects.new(hardware_name, None)
bpy.context.scene.collection.objects.link(elem)
# 创建路径
if ps and pe:
path = self._create_line_path(ps, pe)
elem["sw_path"] = str(path)
# 创建截面
sect = item.get("sect", {})
color = item.get("ckey")
# 使用follow_me创建几何体
if sect:
self._follow_me(
elem, sect, path if 'path' in locals() else None, color)
# 设置硬件属性
elem["sw_ckey"] = color
elem["sw_sect"] = str(sect)
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
# 应用硬件材质
self._apply_hardware_material(elem, item)
logger.info(f"✅ 简单硬件创建成功: {hardware_name}")
return elem
except Exception as e:
logger.error(f"创建简单硬件失败: {e}")
return None
# ==================== 硬件纹理处理方法 ====================
def _textured_hw(self, hw, selected):
"""为硬件应用纹理 - 从选择管理器迁移"""
try:
if not hw:
return
# 设置硬件的选择材质
color = "mat_select" if selected else "mat_hardware"
texture = material_manager.get_texture(color)
if texture and hasattr(hw, 'data') and hw.data:
if not hw.data.materials:
hw.data.materials.append(texture)
else:
hw.data.materials[0] = texture
# 设置硬件可见性
if hasattr(hw, 'hide_viewport'):
hw.hide_viewport = False # 硬件通常总是可见
except Exception as e:
logger.error(f"为硬件应用纹理失败: {e}")
def _apply_hardware_material(self, hardware, item):
"""应用硬件材质"""
try:
# 获取硬件材质
color_key = item.get("ckey", "mat_hardware")
material = material_manager.get_texture(color_key)
if material and hasattr(hardware, 'data') and hardware.data:
# 如果硬件有网格数据,应用材质
if not hardware.data.materials:
hardware.data.materials.append(material)
else:
hardware.data.materials[0] = material
else:
# 如果硬件没有网格数据,设置自定义属性
hardware["sw_material"] = color_key
except Exception as e:
logger.error(f"应用硬件材质失败: {e}")
# ==================== 几何体创建辅助方法 ====================
def _create_line_path(self, ps, pe):
"""创建线性路径"""
try:
if not ps or not pe:
return None
# 创建简单的线性路径
path_data = {
"type": "line",
"start": [ps.x, ps.y, ps.z],
"end": [pe.x, pe.y, pe.z],
"length": math.sqrt((pe.x - ps.x)**2 + (pe.y - ps.y)**2 + (pe.z - ps.z)**2)
}
return path_data
except Exception as e:
logger.error(f"创建线性路径失败: {e}")
return None
def _follow_me(self, container, surface, path, color):
"""Follow me操作 - 沿路径挤出截面"""
try:
if not BLENDER_AVAILABLE or not container:
return
# 这是一个简化的follow_me实现
# 在实际应用中需要根据具体的截面和路径数据实现
# 创建基本几何体作为占位符
if not container.data:
mesh = bpy.data.meshes.new(f"{container.name}_mesh")
# 创建简单的立方体作为占位符
vertices = [
(0, 0, 0), (1, 0, 0), (1, 1, 0), (0, 1, 0),
(0, 0, 1), (1, 0, 1), (1, 1, 1), (0, 1, 1)
]
edges = []
faces = [
(0, 1, 2, 3), (4, 7, 6, 5), (0, 4, 5, 1),
(1, 5, 6, 2), (2, 6, 7, 3), (3, 7, 4, 0)
]
mesh.from_pydata(vertices, edges, faces)
mesh.update()
container.data = mesh
logger.debug(f"Follow me操作完成: {container.name}")
except Exception as e:
logger.error(f"Follow me操作失败: {e}")
def _apply_transformation(self, obj, transformation):
"""应用变换到对象"""
try:
if not BLENDER_AVAILABLE or not obj or not transformation:
return
# 应用位置变换
if hasattr(transformation, 'origin'):
obj.location = (
transformation.origin.x,
transformation.origin.y,
transformation.origin.z
)
# 应用旋转变换(简化实现)
if hasattr(transformation, 'x_axis') and hasattr(transformation, 'y_axis'):
# 这里应该根据轴向量计算旋转,简化为默认旋转
pass
logger.debug(f"变换应用完成: {obj.name}")
except Exception as e:
logger.error(f"应用变换失败: {e}")
# ==================== 辅助方法 ====================
def _get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取硬件数据 - 使用data_manager"""
return self.data_manager.get_hardwares(data)
def _is_object_valid(self, obj) -> bool:
"""检查对象是否有效"""
try:
if not obj or not BLENDER_AVAILABLE:
return False
return obj.name in bpy.data.objects
except:
return False
def _delete_object_safe(self, obj) -> bool:
"""安全删除对象"""
try:
if not obj or not BLENDER_AVAILABLE:
return False
if obj.name in bpy.data.objects:
bpy.data.objects.remove(obj, do_unlink=True)
return True
return False
except Exception as e:
logger.error(f"删除硬件对象失败: {e}")
return False
# ==================== 硬件管理方法 ====================
def create_hardware_batch(self, data: Dict[str, Any]) -> int:
"""批量创建硬件"""
try:
items = data.get("items", [])
if not items:
return 0
logger.info(f"🔧 开始批量创建硬件: {len(items)}")
created_count = 0
for item in items:
try:
# 解析参数
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
file_path = item.get("file")
# 创建硬件
if file_path:
hardware = self._load_hardware_file(
file_path, item, ps, pe)
else:
hardware = self._create_simple_hardware(ps, pe, item)
if hardware:
created_count += 1
except Exception as e:
logger.error(f"创建单个硬件失败: {e}")
self.creation_stats["hardwares_created"] += created_count
logger.info(f"✅ 批量硬件创建完成: {created_count}/{len(items)} 成功")
return created_count
except Exception as e:
logger.error(f"批量创建硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
def delete_hardware(self, uid: str, hw_id: int) -> bool:
"""删除单个硬件"""
try:
logger.info(f"🗑️ 删除硬件: uid={uid}, hw_id={hw_id}")
# 从本地存储中查找
if uid in self.hardwares and hw_id in self.hardwares[uid]:
hw_obj = self.hardwares[uid][hw_id]
if hw_obj and self._is_object_valid(hw_obj):
success = self._delete_object_safe(hw_obj)
if success:
del self.hardwares[uid][hw_id]
logger.info(f"✅ 硬件删除成功: uid={uid}, hw_id={hw_id}")
return True
logger.warning(f"硬件不存在或删除失败: uid={uid}, hw_id={hw_id}")
return False
except Exception as e:
logger.error(f"删除硬件失败: {e}")
return False
def delete_hardware_batch(self, uid: str, hw_ids: List[int]) -> int:
"""批量删除硬件"""
try:
deleted_count = 0
for hw_id in hw_ids:
if self.delete_hardware(uid, hw_id):
deleted_count += 1
logger.info(f"✅ 批量删除硬件完成: {deleted_count}/{len(hw_ids)} 成功")
return deleted_count
except Exception as e:
logger.error(f"批量删除硬件失败: {e}")
return 0
# ==================== 统计和管理方法 ====================
def get_hardware_stats(self) -> Dict[str, Any]:
"""获取硬件统计信息"""
try:
total_hardwares = sum(len(hw_dict)
for hw_dict in self.hardwares.values())
stats = {
"total_units": len(self.hardwares),
"total_hardwares": total_hardwares,
"creation_stats": self.creation_stats.copy(),
"hardware_types": {
"file_based": self.creation_stats["files_loaded"],
"simple_geometry": self.creation_stats["simple_hardwares"]
}
}
if BLENDER_AVAILABLE:
stats["blender_objects"] = len([obj for obj in bpy.data.objects
if obj.get("sw_typ") == "hw"])
return stats
except Exception as e:
logger.error(f"获取硬件统计失败: {e}")
return {"error": str(e)}
def get_creation_stats(self) -> Dict[str, Any]:
"""获取创建统计信息"""
return self.creation_stats.copy()
def reset_creation_stats(self):
"""重置创建统计"""
self.creation_stats = {
"hardwares_created": 0,
"files_loaded": 0,
"simple_hardwares": 0,
"creation_errors": 0
}
logger.info("硬件统计已重置")
def cleanup(self):
"""清理硬件管理器"""
try:
# 删除所有硬件对象
total_deleted = 0
for uid, hw_dict in self.hardwares.items():
for hw_id, hw_obj in hw_dict.items():
if self._delete_object_safe(hw_obj):
total_deleted += 1
self.hardwares.clear()
self.reset_creation_stats()
logger.info(f"✅ 硬件管理器清理完成,删除了 {total_deleted} 个对象")
except Exception as e:
logger.error(f"清理硬件管理器失败: {e}")
def get_hardware_by_uid(self, uid: str) -> Dict[str, Any]:
"""根据UID获取硬件"""
return self.hardwares.get(uid, {})
def get_all_hardwares(self) -> Dict[str, Dict[str, Any]]:
"""获取所有硬件"""
return self.hardwares.copy()
# ==================== 全局硬件管理器实例 ====================
# 全局实例
hardware_manager = HardwareManager()
def init_hardware_manager():
"""初始化全局硬件管理器实例 - 不再需要suw_impl参数"""
global hardware_manager
hardware_manager = HardwareManager()
return hardware_manager
def get_hardware_manager():
"""获取全局硬件管理器实例"""
return hardware_manager