blenderpython/suw_core/hardware_manager.py

538 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Core - Hardware Manager Module
拆分自: suw_impl.py (Line 2183-2300, 4244-4273)
用途: Blender五金管理、硬件创建、几何体加载
版本: 1.0.0
作者: SUWood Team
"""
from .geometry_utils import Point3d, Transformation
from .material_manager import material_manager
from .memory_manager import memory_manager
from .data_manager import data_manager, get_data_manager
import time
import logging
import math
from typing import Dict, Any, List, Optional
# 设置日志
logger = logging.getLogger(__name__)
# 检查Blender可用性
try:
import bpy
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
# 导入依赖模块
# ==================== 五金管理器类 ====================
class HardwareManager:
"""五金管理器 - 负责所有硬件相关操作"""
def __init__(self):
"""
初始化五金管理器 - 完全独立不依赖suw_impl
"""
# 使用全局数据管理器
self.data_manager = get_data_manager()
# 五金数据存储
self.hardwares = {} # 按uid存储五金数据
# 创建统计
self.creation_stats = {
"hardwares_created": 0,
"files_loaded": 0,
"simple_hardwares": 0,
"creation_errors": 0
}
logger.info("✅ 五金管理器初始化完成")
# ==================== 原始命令方法 ====================
def c08(self, data: Dict[str, Any]):
"""add_hardware - 添加硬件 - 线程安全版本"""
try:
if not BLENDER_AVAILABLE:
logger.warning("Blender 不可用,跳过硬件创建")
return 0
uid = data.get("uid")
logger.info(f"🔧 执行c08命令: 添加硬件, uid={uid}")
def create_hardware():
try:
# 获取硬件数据集合
hardwares = self._get_hardwares(data)
items = data.get("items", [])
created_count = 0
for item in items:
root = item.get("root")
file_path = item.get("file")
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
# 根据是否有文件路径选择创建方式
if file_path:
hardware = self._load_hardware_file(
file_path, item, ps, pe)
if hardware:
self.creation_stats["files_loaded"] += 1
else:
hardware = self._create_simple_hardware(
ps, pe, item)
if hardware:
self.creation_stats["simple_hardwares"] += 1
if hardware:
# 设置硬件属性
hardware["sw_uid"] = uid
hardware["sw_root"] = root
hardware["sw_typ"] = "hw"
# 存储硬件
hardwares[root] = hardware
memory_manager.register_object(hardware)
created_count += 1
self.creation_stats["hardwares_created"] += created_count
return created_count
except Exception as e:
logger.error(f"创建硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
# 直接执行硬件创建
count = create_hardware()
if count > 0:
logger.info(f"✅ 成功创建硬件: uid={uid}, count={count}")
else:
logger.error(f"❌ 硬件创建失败: uid={uid}")
return count
except Exception as e:
logger.error(f"❌ 添加硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
# ==================== 核心创建方法 ====================
def _load_hardware_file(self, file_path, item, ps, pe):
"""加载硬件文件"""
try:
logger.info(f"📁 加载硬件文件: {file_path}")
if not BLENDER_AVAILABLE:
return None
# 在实际应用中需要实现文件加载逻辑
# 这里创建占位符对象
hardware_name = f"Hardware_{item.get('root', 'unknown')}"
elem = bpy.data.objects.new(hardware_name, None)
bpy.context.scene.collection.objects.link(elem)
# 设置缩放 - 根据ps和pe计算
if ps and pe:
distance = math.sqrt((pe.x - ps.x)**2 +
(pe.y - ps.y)**2 + (pe.z - ps.z)**2)
if distance > 0:
elem.scale = (distance, 1.0, 1.0)
# 设置位置为中点
elem.location = (
(ps.x + pe.x) / 2,
(ps.y + pe.y) / 2,
(ps.z + pe.z) / 2
)
# 应用变换
if "trans" in item:
trans = Transformation.parse(item["trans"])
self._apply_transformation(elem, trans)
# 设置硬件属性
elem["sw_file_path"] = file_path
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
# 应用硬件材质
self._apply_hardware_material(elem, item)
logger.info(f"✅ 硬件文件加载成功: {hardware_name}")
return elem
except Exception as e:
logger.error(f"加载硬件文件失败: {e}")
return None
def _create_simple_hardware(self, ps, pe, item):
"""创建简单硬件几何体"""
try:
logger.info(f"🔧 创建简单硬件: ps={ps}, pe={pe}")
if not BLENDER_AVAILABLE:
return None
hardware_name = f"Simple_Hardware_{item.get('root', 'unknown')}"
elem = bpy.data.objects.new(hardware_name, None)
bpy.context.scene.collection.objects.link(elem)
# 创建路径
if ps and pe:
path = self._create_line_path(ps, pe)
elem["sw_path"] = str(path)
# 创建截面
sect = item.get("sect", {})
color = item.get("ckey")
# 使用follow_me创建几何体
if sect:
self._follow_me(
elem, sect, path if 'path' in locals() else None, color)
# 设置硬件属性
elem["sw_ckey"] = color
elem["sw_sect"] = str(sect)
elem["sw_ps"] = ps.to_s() if ps else "(0,0,0)"
elem["sw_pe"] = pe.to_s() if pe else "(0,0,0)"
# 应用硬件材质
self._apply_hardware_material(elem, item)
logger.info(f"✅ 简单硬件创建成功: {hardware_name}")
return elem
except Exception as e:
logger.error(f"创建简单硬件失败: {e}")
return None
# ==================== 硬件纹理处理方法 ====================
def _textured_hw(self, hw, selected):
"""为硬件应用纹理 - 从选择管理器迁移"""
try:
if not hw:
return
# 设置硬件的选择材质
color = "mat_select" if selected else "mat_hardware"
texture = material_manager.get_texture(color)
if texture and hasattr(hw, 'data') and hw.data:
if not hw.data.materials:
hw.data.materials.append(texture)
else:
hw.data.materials[0] = texture
# 设置硬件可见性
if hasattr(hw, 'hide_viewport'):
hw.hide_viewport = False # 硬件通常总是可见
except Exception as e:
logger.error(f"为硬件应用纹理失败: {e}")
def _apply_hardware_material(self, hardware, item):
"""应用硬件材质"""
try:
# 获取硬件材质
color_key = item.get("ckey", "mat_hardware")
material = material_manager.get_texture(color_key)
if material and hasattr(hardware, 'data') and hardware.data:
# 如果硬件有网格数据,应用材质
if not hardware.data.materials:
hardware.data.materials.append(material)
else:
hardware.data.materials[0] = material
else:
# 如果硬件没有网格数据,设置自定义属性
hardware["sw_material"] = color_key
except Exception as e:
logger.error(f"应用硬件材质失败: {e}")
# ==================== 几何体创建辅助方法 ====================
def _create_line_path(self, ps, pe):
"""创建线性路径"""
try:
if not ps or not pe:
return None
# 创建简单的线性路径
path_data = {
"type": "line",
"start": [ps.x, ps.y, ps.z],
"end": [pe.x, pe.y, pe.z],
"length": math.sqrt((pe.x - ps.x)**2 + (pe.y - ps.y)**2 + (pe.z - ps.z)**2)
}
return path_data
except Exception as e:
logger.error(f"创建线性路径失败: {e}")
return None
def _follow_me(self, container, surface, path, color):
"""Follow me操作 - 沿路径挤出截面"""
try:
if not BLENDER_AVAILABLE or not container:
return
# 这是一个简化的follow_me实现
# 在实际应用中需要根据具体的截面和路径数据实现
# 创建基本几何体作为占位符
if not container.data:
mesh = bpy.data.meshes.new(f"{container.name}_mesh")
# 创建简单的立方体作为占位符
vertices = [
(0, 0, 0), (1, 0, 0), (1, 1, 0), (0, 1, 0),
(0, 0, 1), (1, 0, 1), (1, 1, 1), (0, 1, 1)
]
edges = []
faces = [
(0, 1, 2, 3), (4, 7, 6, 5), (0, 4, 5, 1),
(1, 5, 6, 2), (2, 6, 7, 3), (3, 7, 4, 0)
]
mesh.from_pydata(vertices, edges, faces)
mesh.update()
container.data = mesh
logger.debug(f"Follow me操作完成: {container.name}")
except Exception as e:
logger.error(f"Follow me操作失败: {e}")
def _apply_transformation(self, obj, transformation):
"""应用变换到对象"""
try:
if not BLENDER_AVAILABLE or not obj or not transformation:
return
# 应用位置变换
if hasattr(transformation, 'origin'):
obj.location = (
transformation.origin.x,
transformation.origin.y,
transformation.origin.z
)
# 应用旋转变换(简化实现)
if hasattr(transformation, 'x_axis') and hasattr(transformation, 'y_axis'):
# 这里应该根据轴向量计算旋转,简化为默认旋转
pass
logger.debug(f"变换应用完成: {obj.name}")
except Exception as e:
logger.error(f"应用变换失败: {e}")
# ==================== 辅助方法 ====================
def _get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取硬件数据 - 使用data_manager"""
return self.data_manager.get_hardwares(data)
def _is_object_valid(self, obj) -> bool:
"""检查对象是否有效"""
try:
if not obj or not BLENDER_AVAILABLE:
return False
return obj.name in bpy.data.objects
except:
return False
def _delete_object_safe(self, obj) -> bool:
"""安全删除对象"""
try:
if not obj or not BLENDER_AVAILABLE:
return False
if obj.name in bpy.data.objects:
bpy.data.objects.remove(obj, do_unlink=True)
return True
return False
except Exception as e:
logger.error(f"删除硬件对象失败: {e}")
return False
# ==================== 硬件管理方法 ====================
def create_hardware_batch(self, data: Dict[str, Any]) -> int:
"""批量创建硬件"""
try:
items = data.get("items", [])
if not items:
return 0
logger.info(f"🔧 开始批量创建硬件: {len(items)}")
created_count = 0
for item in items:
try:
# 解析参数
ps = Point3d.parse(item.get("ps", "(0,0,0)"))
pe = Point3d.parse(item.get("pe", "(0,0,0)"))
file_path = item.get("file")
# 创建硬件
if file_path:
hardware = self._load_hardware_file(
file_path, item, ps, pe)
else:
hardware = self._create_simple_hardware(ps, pe, item)
if hardware:
created_count += 1
except Exception as e:
logger.error(f"创建单个硬件失败: {e}")
self.creation_stats["hardwares_created"] += created_count
logger.info(f"✅ 批量硬件创建完成: {created_count}/{len(items)} 成功")
return created_count
except Exception as e:
logger.error(f"批量创建硬件失败: {e}")
self.creation_stats["creation_errors"] += 1
return 0
def delete_hardware(self, uid: str, hw_id: int) -> bool:
"""删除单个硬件"""
try:
logger.info(f"🗑️ 删除硬件: uid={uid}, hw_id={hw_id}")
# 从本地存储中查找
if uid in self.hardwares and hw_id in self.hardwares[uid]:
hw_obj = self.hardwares[uid][hw_id]
if hw_obj and self._is_object_valid(hw_obj):
success = self._delete_object_safe(hw_obj)
if success:
del self.hardwares[uid][hw_id]
logger.info(f"✅ 硬件删除成功: uid={uid}, hw_id={hw_id}")
return True
logger.warning(f"硬件不存在或删除失败: uid={uid}, hw_id={hw_id}")
return False
except Exception as e:
logger.error(f"删除硬件失败: {e}")
return False
def delete_hardware_batch(self, uid: str, hw_ids: List[int]) -> int:
"""批量删除硬件"""
try:
deleted_count = 0
for hw_id in hw_ids:
if self.delete_hardware(uid, hw_id):
deleted_count += 1
logger.info(f"✅ 批量删除硬件完成: {deleted_count}/{len(hw_ids)} 成功")
return deleted_count
except Exception as e:
logger.error(f"批量删除硬件失败: {e}")
return 0
# ==================== 统计和管理方法 ====================
def get_hardware_stats(self) -> Dict[str, Any]:
"""获取硬件统计信息"""
try:
total_hardwares = sum(len(hw_dict)
for hw_dict in self.hardwares.values())
stats = {
"total_units": len(self.hardwares),
"total_hardwares": total_hardwares,
"creation_stats": self.creation_stats.copy(),
"hardware_types": {
"file_based": self.creation_stats["files_loaded"],
"simple_geometry": self.creation_stats["simple_hardwares"]
}
}
if BLENDER_AVAILABLE:
stats["blender_objects"] = len([obj for obj in bpy.data.objects
if obj.get("sw_typ") == "hw"])
return stats
except Exception as e:
logger.error(f"获取硬件统计失败: {e}")
return {"error": str(e)}
def get_creation_stats(self) -> Dict[str, Any]:
"""获取创建统计信息"""
return self.creation_stats.copy()
def reset_creation_stats(self):
"""重置创建统计"""
self.creation_stats = {
"hardwares_created": 0,
"files_loaded": 0,
"simple_hardwares": 0,
"creation_errors": 0
}
logger.info("硬件统计已重置")
def cleanup(self):
"""清理硬件管理器"""
try:
# 删除所有硬件对象
total_deleted = 0
for uid, hw_dict in self.hardwares.items():
for hw_id, hw_obj in hw_dict.items():
if self._delete_object_safe(hw_obj):
total_deleted += 1
self.hardwares.clear()
self.reset_creation_stats()
logger.info(f"✅ 硬件管理器清理完成,删除了 {total_deleted} 个对象")
except Exception as e:
logger.error(f"清理硬件管理器失败: {e}")
def get_hardware_by_uid(self, uid: str) -> Dict[str, Any]:
"""根据UID获取硬件"""
return self.hardwares.get(uid, {})
def get_all_hardwares(self) -> Dict[str, Dict[str, Any]]:
"""获取所有硬件"""
return self.hardwares.copy()
# ==================== 全局硬件管理器实例 ====================
# 全局实例
hardware_manager = HardwareManager()
def init_hardware_manager():
"""初始化全局硬件管理器实例 - 不再需要suw_impl参数"""
global hardware_manager
hardware_manager = HardwareManager()
return hardware_manager
def get_hardware_manager():
"""获取全局硬件管理器实例"""
return hardware_manager