blenderpython/suw_core/explosion_manager.py

778 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Core - Explosion Manager Module
拆分自: suw_impl.py (Line 583-602)
用途: 炸开柜体功能、区域和零件移动、零件序列文本显示
版本: 1.0.0
作者: SUWood Team
"""
from .geometry_utils import Point3d, Vector3d
from .memory_manager import memory_manager
from .data_manager import data_manager, get_data_manager
import math
import logging
from typing import Dict, Any, Optional, List
# 设置日志
logger = logging.getLogger(__name__)
# 检查Blender可用性
try:
import bpy
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
# ==================== 炸开管理器类 ====================
class ExplosionManager:
"""炸开管理器 - 负责炸开柜体相关操作"""
def __init__(self):
"""
初始化炸开管理器 - 完全独立不依赖suw_impl
"""
# 使用全局数据管理器
self.data_manager = get_data_manager()
# 标签对象
self.labels = None
self.door_labels = None
logger.info("ExplosionManager 初始化完成")
# ==================== 核心命令方法 ====================
def c0e(self, data: Dict[str, Any]):
"""explode_zones - 炸开柜体 - 按照Ruby逻辑实现"""
try:
if not BLENDER_AVAILABLE:
logger.warning("Blender 不可用,跳过炸开柜体操作")
return 0
uid = data.get("uid")
zones = data.get("zones", [])
parts = data.get("parts", [])
explode = data.get("explode", False)
logger.info(
f" 开始炸开柜体: uid={uid}, 区域数={len(zones)}, 零件数={len(parts)}, 显示序列={explode}")
# 初始化标签对象
self._init_labels()
# 处理区域移动
zones_moved = self._move_zones(uid, zones)
# 处理零件移动
parts_moved = self._move_parts(uid, parts)
# 处理零件序列文本显示
if explode:
texts_created = self._create_part_sequence_texts(uid)
else:
# 【修复】当explode=False时删除之前创建的文本标签
texts_deleted = self._delete_part_sequence_texts()
texts_created = -texts_deleted # 负数表示删除的数量
logger.info(
f"✅ 炸开柜体完成: 区域移动={zones_moved}, 零件移动={parts_moved}, 文本操作={texts_created}")
return zones_moved + parts_moved + texts_created
except Exception as e:
logger.error(f"❌ 炸开柜体失败: {e}")
return 0
def c0d(self, data: Dict[str, Any]):
"""parts_seqs - 设置零件序列信息 - 按照Ruby逻辑实现"""
try:
if not BLENDER_AVAILABLE:
logger.warning("Blender 不可用,跳过零件序列设置")
return 0
uid = data.get("uid")
seqs = data.get("seqs", [])
logger.info(f" 开始设置零件序列信息: uid={uid}, 序列数={len(seqs)}")
parts_data = self.data_manager.get_parts({"uid": uid})
set_count = 0
# 【按照Ruby逻辑】处理每个序列项
for seq_data in seqs:
try:
root = seq_data.get("cp") # 部件id
seq = seq_data.get("seq") # 顺序号
pos = seq_data.get("pos") # 位置
name = seq_data.get("name") # 板件名称(可选)
size = seq_data.get("size") # 尺寸即长*宽*厚(可选)
mat = seq_data.get("mat") # 材料(包括材质/颜色)(可选)
if not root or seq is None or pos is None:
logger.warning(
f"跳过无效序列数据: root={root}, seq={seq}, pos={pos}")
continue
# 【按照Ruby逻辑】查找对应的零件
if root in parts_data:
part = parts_data[root]
if part and hasattr(part, 'get'):
# 【修复】使用sw_前缀的属性与c0e命令保持一致
part["sw_seq"] = seq
part["sw_pos"] = pos
# 设置可选属性
if name:
part["sw_name"] = name
if size:
part["sw_size"] = size
if mat:
part["sw_mat"] = mat
set_count += 1
logger.debug(
f"设置零件序列: cp={root}, seq={seq}, pos={pos}, name={name}, size={size}, mat={mat}")
else:
logger.warning(f"零件对象无效: cp={root}")
else:
logger.warning(f"未找到零件: cp={root}")
except Exception as e:
logger.error(f"处理序列项失败: {e}")
continue
logger.info(f"✅ 零件序列信息设置完成: {set_count}")
return set_count
except Exception as e:
logger.error(f"❌ 设置零件序列信息失败: {e}")
return 0
# ==================== 私有方法 ====================
def _init_labels(self):
"""初始化标签对象"""
try:
if not self.labels:
# 创建标签组
self.labels = bpy.data.objects.new("SUW_Labels", None)
self.labels.empty_display_type = 'PLAIN_AXES'
bpy.context.scene.collection.objects.link(self.labels)
if not self.door_labels:
# 创建门板标签组
self.door_labels = bpy.data.objects.new("SUW_DoorLabels", None)
self.door_labels.empty_display_type = 'PLAIN_AXES'
bpy.context.scene.collection.objects.link(self.door_labels)
except Exception as e:
logger.error(f"初始化标签对象失败: {e}")
def _move_zones(self, uid: str, zones: List[Dict[str, Any]]) -> int:
"""移动区域"""
try:
moved_count = 0
zones_data = self.data_manager.get_zones({"uid": uid})
for zone_data in zones:
zid = zone_data.get("zid")
vec_str = zone_data.get("vec", "(0,0,0)")
if zid in zones_data:
zone = zones_data[zid]
if zone and hasattr(zone, 'location'):
# 解析偏移向量
offset = Vector3d.parse(vec_str)
if offset:
# 应用单位变换
if uid in self.data_manager.unit_trans:
trans = self.data_manager.unit_trans[uid]
offset = self._transform_vector(offset, trans)
# 移动区域
zone.location.x += offset.x # Vector3d.parse已经转换过了
zone.location.y += offset.y
zone.location.z += offset.z
moved_count += 1
logger.debug(f"移动区域: zid={zid}, 偏移={vec_str}")
return moved_count
except Exception as e:
logger.error(f"移动区域失败: {e}")
return 0
def _move_parts(self, uid: str, parts: List[Dict[str, Any]]) -> int:
"""移动零件 - 按照Ruby逻辑匹配零件"""
try:
moved_count = 0
parts_data = self.data_manager.get_parts({"uid": uid})
hardwares_data = self.data_manager.get_hardwares({"uid": uid})
logger.debug(
f"开始移动零件: 零件数据={len(parts_data)}, 五金数据={len(hardwares_data)}")
# 【修复】将集合移到外层,避免重复移动
moved_parts = set() # 记录已移动的零件,避免重复移动
moved_hardwares = set() # 记录已移动的五金件,避免重复移动
for part_data in parts:
pid = part_data.get("pid")
vec_str = part_data.get("vec", "(0,0,0)")
logger.debug(f"处理零件移动: pid={pid}, vec={vec_str}")
# 解析偏移向量
offset = Vector3d.parse(vec_str)
if not offset:
logger.warning(f"无法解析偏移向量: {vec_str}")
continue
# 应用单位变换
if uid in self.data_manager.unit_trans:
trans = self.data_manager.unit_trans[uid]
offset = self._transform_vector(offset, trans)
# 【新增】详细调试信息
matched_parts = []
matched_hardwares = []
# 【修复】按照Ruby逻辑匹配零件 - 通过pid属性匹配
for root, part in parts_data.items():
if not part:
continue
# 获取零件的pid属性
part_pid = self._get_part_attribute(part, "pid", -1)
# 【新增】详细调试信息
if part_pid == pid:
matched_parts.append(root)
# logger.info(
# f"比较: 目标pid={pid}, 零件pid={part_pid}, 零件键={root}")
if part_pid == pid and root not in moved_parts:
# 【修复】对于门板零件,需要特殊处理
# 检查是否是门板类型通过layer属性或其他标识
part_layer = self._get_part_attribute(part, "layer", 0)
part_name = self._get_part_attribute(part, "name", "")
# 如果是门板层(layer=1)或者零件名称包含"门",则允许移动
is_door_part = (
part_layer == 1 or "" in str(part_name))
if is_door_part:
# 移动零件 - Vector3d.parse已经进行了单位转换
if hasattr(part, 'location'):
# 【新增】记录移动前的位置
old_location = (part.location.x,
part.location.y, part.location.z)
# 【修复】确保位置计算正确,避免浮点数精度问题
new_x = part.location.x + offset.x
new_y = part.location.y + offset.y
new_z = part.location.z + offset.z
# 应用新位置
part.location.x = new_x
part.location.y = new_y
part.location.z = new_z
moved_count += 1
moved_parts.add(root) # 标记为已移动
# 【新增】详细的位置变化信息
new_location = (part.location.x,
part.location.y, part.location.z)
logger.info(
f"✅ 移动门板零件成功: pid={pid}, root={root}, layer={part_layer}, name={part_name}, 偏移={vec_str}")
else:
logger.warning(
f"零件对象没有location属性: pid={pid}, root={root}")
else:
# 对于非门板零件检查是否已经移动过相同pid的零件
pid_already_moved = any(
self._get_part_attribute(p, "pid", -1) == pid
for p in [parts_data.get(r) for r in moved_parts if parts_data.get(r)]
)
if pid_already_moved:
logger.info(
f"⚠️ 跳过重复pid的非门板零件: pid={pid}, root={root} (已移动过相同pid的零件)")
continue
# 移动非门板零件
if hasattr(part, 'location'):
# 【新增】记录移动前的位置
old_location = (part.location.x,
part.location.y, part.location.z)
# 【修复】确保位置计算正确,避免浮点数精度问题
new_x = part.location.x + offset.x
new_y = part.location.y + offset.y
new_z = part.location.z + offset.z
# 应用新位置
part.location.x = new_x
part.location.y = new_y
part.location.z = new_z
moved_count += 1
moved_parts.add(root) # 标记为已移动
# 【新增】详细的位置变化信息
new_location = (part.location.x,
part.location.y, part.location.z)
logger.info(
f"✅ 移动非门板零件成功: pid={pid}, root={root}, layer={part_layer}, name={part_name}, 偏移={vec_str}")
else:
logger.warning(
f"零件对象没有location属性: pid={pid}, root={root}")
# 【修复】按照Ruby逻辑匹配五金件 - 通过pid属性匹配
for root, hardware in hardwares_data.items():
if not hardware:
continue
# 获取五金件的pid属性
hw_pid = self._get_part_attribute(hardware, "pid", -1)
# 【新增】详细调试信息
if hw_pid == pid:
matched_hardwares.append(root)
# logger.info(
# f"比较: 目标pid={pid}, 五金pid={hw_pid}, 五金键={root}")
if hw_pid == pid and root not in moved_hardwares:
# 【修复】检查是否已经移动过相同pid的五金件
hw_pid_already_moved = any(
self._get_part_attribute(hw, "pid", -1) == pid
for hw in [hardwares_data.get(r) for r in moved_hardwares if hardwares_data.get(r)]
)
if hw_pid_already_moved:
logger.info(
f"⚠️ 跳过重复pid的五金件: pid={pid}, root={root} (已移动过相同pid的五金件)")
continue
# 移动五金件 - Vector3d.parse已经进行了单位转换
if hasattr(hardware, 'location'):
# 【新增】记录移动前的位置
old_location = (
hardware.location.x, hardware.location.y, hardware.location.z)
# 【修复】Vector3d.parse已经转换过了不需要再次转换
hardware.location.x += offset.x
hardware.location.y += offset.y
hardware.location.z += offset.z
moved_count += 1
moved_hardwares.add(root) # 标记为已移动
# 【新增】详细的位置变化信息
new_location = (
hardware.location.x, hardware.location.y, hardware.location.z)
logger.info(
f"✅ 移动五金件成功: pid={pid}, root={root}, 偏移={vec_str}")
else:
logger.warning(
f"五金件对象没有location属性: pid={pid}, root={root}")
# 【新增】总结匹配结果
logger.info(
f"📊 pid={pid}匹配结果: 零件={len(matched_parts)}, 五金件={len(matched_hardwares)}")
# 【新增】强制更新视图
try:
if BLENDER_AVAILABLE:
bpy.context.view_layer.update()
logger.debug("视图已更新")
except Exception as e:
logger.debug(f"视图更新失败: {e}")
logger.info(f"零件移动完成: 移动了 {moved_count} 个对象")
return moved_count
except Exception as e:
logger.error(f"移动零件失败: {e}")
return 0
def _create_part_sequence_texts(self, uid: str) -> int:
"""创建零件序列文本 - 修复属性访问"""
try:
created_count = 0
parts_data = self.data_manager.get_parts({"uid": uid})
logger.debug(f"开始创建零件序列文本: 零件数={len(parts_data)}")
for root, part in parts_data.items():
if not part:
continue
# 【修复】使用统一的属性获取方法
pos = self._get_part_attribute(part, "pos", 1)
seq = self._get_part_attribute(part, "seq", 0)
layer = self._get_part_attribute(part, "layer", 0)
logger.debug(
f"零件属性: root={root}, seq={seq}, pos={pos}, layer={layer}")
if seq <= 0:
continue
# 获取零件位置
center = None
if hasattr(part, 'location'):
center = part.location
else:
logger.warning(f"零件没有位置信息: root={root}, seq={seq}")
continue
# 计算文本位置和方向
vector = self._get_position_vector(pos)
if not vector:
continue
# 应用单位变换
if uid in self.data_manager.unit_trans:
trans = self.data_manager.unit_trans[uid]
vector = self._transform_vector(vector, trans)
# 计算文本位置
text_location = (
center.x + vector.x * 0.1, # 100mm偏移
center.y + vector.y * 0.1,
center.z + vector.z * 0.1
)
# 创建文本对象
text_obj = self._create_text_object(str(seq), text_location)
if text_obj:
# 设置材质为红色
self._add_red_material(text_obj)
# 根据图层决定父对象
if layer == 1: # 门板层
text_obj.parent = self.door_labels
else:
text_obj.parent = self.labels
created_count += 1
logger.debug(
f"创建零件序列文本: seq={seq}, pos={pos}, root={root}")
return created_count
except Exception as e:
logger.error(f"创建零件序列文本失败: {e}")
return 0
def _delete_part_sequence_texts(self) -> int:
"""删除零件序列文本"""
try:
deleted_count = 0
# 【修复】直接通过名称删除固定的标签集合
# 删除 SUW_Labels 集合及其所有子对象
suw_labels = bpy.data.objects.get("SUW_Labels")
if suw_labels:
# 【修复】先收集所有子对象名称,再逐个删除
children_to_delete = []
for child in suw_labels.children:
if child.type == 'FONT':
children_to_delete.append(child.name)
# 逐个删除子对象
for child_name in children_to_delete:
child = bpy.data.objects.get(child_name)
if child:
try:
bpy.data.objects.remove(child, do_unlink=True)
deleted_count += 1
logger.debug(f"删除文本对象: {child_name}")
except Exception as e:
logger.warning(f"删除文本对象失败: {child_name}, {e}")
# 删除父对象
try:
bpy.data.objects.remove(suw_labels, do_unlink=True)
logger.debug("删除SUW_Labels集合")
except Exception as e:
logger.warning(f"删除SUW_Labels集合失败: {e}")
# 删除 SUW_DoorLabels 集合及其所有子对象
suw_door_labels = bpy.data.objects.get("SUW_DoorLabels")
if suw_door_labels:
# 【修复】先收集所有子对象名称,再逐个删除
children_to_delete = []
for child in suw_door_labels.children:
if child.type == 'FONT':
children_to_delete.append(child.name)
# 逐个删除子对象
for child_name in children_to_delete:
child = bpy.data.objects.get(child_name)
if child:
try:
bpy.data.objects.remove(child, do_unlink=True)
deleted_count += 1
logger.debug(f"删除门板文本对象: {child_name}")
except Exception as e:
logger.warning(f"删除门板文本对象失败: {child_name}, {e}")
# 删除父对象
try:
bpy.data.objects.remove(suw_door_labels, do_unlink=True)
logger.debug("删除SUW_DoorLabels集合")
except Exception as e:
logger.warning(f"删除SUW_DoorLabels集合失败: {e}")
# 【新增】清理场景中可能残留的文本对象
# 搜索并删除所有以"Text_"开头的对象
# 【修复】使用更安全的方式遍历和删除对象
text_objects_to_delete = []
for obj in bpy.data.objects:
if obj.name.startswith("Text_") and obj.type == 'FONT':
text_objects_to_delete.append(obj.name)
for obj_name in text_objects_to_delete:
obj = bpy.data.objects.get(obj_name)
if obj:
try:
bpy.data.objects.remove(obj, do_unlink=True)
deleted_count += 1
logger.debug(f"删除残留文本对象: {obj_name}")
except Exception as e:
logger.warning(f"删除残留文本对象失败: {obj_name}, {e}")
# 【新增】强制更新视图
try:
if BLENDER_AVAILABLE:
bpy.context.view_layer.update()
logger.debug("视图已更新")
except Exception as e:
logger.debug(f"视图更新失败: {e}")
# 【修复】彻底清理内部引用和相关数据
# 重置内部引用
self.labels = None
self.door_labels = None
# 【新增】清理可能残留的引用
# 检查并清理场景中可能残留的引用
# 【修复】使用更安全的方式检查对象是否存在
for obj_name in ["SUW_Labels", "SUW_DoorLabels"]:
obj = bpy.data.objects.get(obj_name)
if obj:
try:
bpy.data.objects.remove(obj, do_unlink=True)
logger.debug(f"清理残留引用: {obj_name}")
except Exception as e:
logger.debug(f"清理残留引用失败: {obj_name}, {e}")
# 【新增】清理材质数据
# 删除可能残留的红色文本材质
# 【修复】使用更安全的方式清理材质
red_text_material = bpy.data.materials.get("Red_Text")
if red_text_material:
try:
bpy.data.materials.remove(red_text_material)
logger.debug("清理红色文本材质")
except Exception as e:
logger.debug(f"清理材质失败: {e}")
# 【新增】清理曲线数据
# 删除可能残留的文本曲线
# 【修复】使用更安全的方式清理曲线
curves_to_delete = []
for curve in bpy.data.curves:
if curve.name.startswith("Text_"):
curves_to_delete.append(curve.name)
for curve_name in curves_to_delete:
curve = bpy.data.curves.get(curve_name)
if curve:
try:
bpy.data.curves.remove(curve)
logger.debug(f"清理文本曲线: {curve_name}")
except Exception as e:
logger.debug(f"清理曲线失败: {curve_name}, {e}")
logger.info(f"✅ 删除零件序列文本: {deleted_count}")
return deleted_count
except Exception as e:
logger.error(f"❌ 删除零件序列文本失败: {e}")
return 0
def _get_position_vector(self, pos: int) -> Optional[Vector3d]:
"""根据位置获取方向向量"""
try:
if pos == 1: # F - 前面
return Vector3d(0, -1, 0)
elif pos == 2: # K - 后面
return Vector3d(0, 1, 0)
elif pos == 3: # L - 左面
return Vector3d(-1, 0, 0)
elif pos == 4: # R - 右面
return Vector3d(1, 0, 0)
elif pos == 5: # B - 底面
return Vector3d(0, 0, -1)
elif pos == 6: # T - 顶面
return Vector3d(0, 0, 1)
else:
return Vector3d(0, 0, 1) # 默认向上
except Exception as e:
logger.error(f"获取位置向量失败: {e}")
return None
def _create_text_object(self, text: str, location: tuple) -> Optional[Any]:
"""创建文本对象"""
try:
# 创建文本曲线
text_curve = bpy.data.curves.new(type="FONT", name=f"Text_{text}")
text_curve.body = text
text_curve.size = 0.05 # 5cm字体大小
# 创建文本对象
text_obj = bpy.data.objects.new(f"Text_{text}", text_curve)
text_obj.location = location
# 添加到场景
bpy.context.scene.collection.objects.link(text_obj)
return text_obj
except Exception as e:
logger.error(f"创建文本对象失败: {e}")
return None
def _add_red_material(self, obj):
"""添加红色材质到对象"""
try:
# 创建红色材质
mat = bpy.data.materials.new(name="Red_Text")
mat.use_nodes = True
nodes = mat.node_tree.nodes
nodes.clear()
# 创建发射节点
emission = nodes.new(type='ShaderNodeEmission')
emission.inputs[0].default_value = (1, 0, 0, 1) # 红色
emission.inputs[1].default_value = 1.0 # 强度
# 创建输出节点
output = nodes.new(type='ShaderNodeOutputMaterial')
# 连接节点
mat.node_tree.links.new(emission.outputs[0], output.inputs[0])
# 应用材质到对象
if obj.data.materials:
obj.data.materials[0] = mat
else:
obj.data.materials.append(mat)
except Exception as e:
logger.error(f"添加红色材质失败: {e}")
def _transform_vector(self, vector: Vector3d, transform) -> Vector3d:
"""变换向量"""
try:
if not transform:
return vector
# 简化的变换实现
# 这里应该根据实际的变换矩阵进行计算
# 暂时返回原始向量
return vector
except Exception as e:
logger.error(f"变换向量失败: {e}")
return vector
def _get_part_attribute(self, obj, attr_name: str, default_value=None):
"""获取零件属性 - 支持多种对象类型"""
try:
# 【修复】优先检查sw_前缀的属性
if hasattr(obj, 'get'):
# 如果是字典或类似对象
sw_attr_name = f"sw_{attr_name}"
if sw_attr_name in obj:
return obj[sw_attr_name]
# 回退到sw字典
return obj.get("sw", {}).get(attr_name, default_value)
elif hasattr(obj, 'sw'):
# 如果有sw属性
sw_attr_name = f"sw_{attr_name}"
if hasattr(obj, sw_attr_name):
return getattr(obj, sw_attr_name)
return obj.sw.get(attr_name, default_value)
elif isinstance(obj, dict):
# 如果是字典
sw_attr_name = f"sw_{attr_name}"
if sw_attr_name in obj:
return obj[sw_attr_name]
return obj.get("sw", {}).get(attr_name, default_value)
else:
# 尝试从Blender对象的自定义属性获取
try:
sw_attr_name = f"sw_{attr_name}"
if hasattr(obj, sw_attr_name):
return getattr(obj, sw_attr_name)
elif hasattr(obj, 'sw'):
return obj.sw.get(attr_name, default_value)
elif hasattr(obj, 'get'):
return obj.get("sw", {}).get(attr_name, default_value)
except:
pass
return default_value
except Exception as e:
logger.debug(f"获取零件属性失败: {e}")
return default_value
# ==================== 管理器统计 ====================
def get_explosion_stats(self) -> Dict[str, Any]:
"""获取炸开管理器统计信息"""
try:
stats = {
"manager_type": "ExplosionManager",
"labels_created": self.labels is not None,
"door_labels_created": self.door_labels is not None,
"blender_available": BLENDER_AVAILABLE
}
return stats
except Exception as e:
logger.error(f"获取炸开管理器统计失败: {e}")
return {"error": str(e)}
# ==================== 模块实例 ====================
# 全局实例将由SUWImpl初始化时设置
explosion_manager = None
def init_explosion_manager():
"""初始化炸开管理器 - 不再需要suw_impl参数"""
global explosion_manager
explosion_manager = ExplosionManager()
return explosion_manager
def get_explosion_manager():
"""获取炸开管理器实例"""
global explosion_manager
if explosion_manager is None:
explosion_manager = init_explosion_manager()
return explosion_manager