blenderpython/suw_core/data_manager.py

670 lines
24 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Core - Data Manager Module
数据管理中心 - 统一管理所有共用数据
用途: 替代Ruby中的@zones、@parts、@hardwares等全局变量
版本: 1.0.0
作者: SUWood Team
"""
import logging
import threading
from typing import Dict, Any, Optional, List
# 设置日志
logger = logging.getLogger(__name__)
# 检查Blender可用性
try:
import bpy
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
# ==================== 数据管理器类 ====================
class DataManager:
"""数据管理器 - 统一管理所有SUWood数据"""
def __init__(self):
"""初始化数据管理器"""
# 核心数据存储 - 对应Ruby的实例变量
self.zones = {} # @zones - {uid: {zone_id: zone_data}}
self.parts = {} # @parts - {uid: {part_id: part_data}}
self.hardwares = {} # @hardwares - {uid: {hw_id: hw_data}}
self.labels = {} # @labels - {uid: {label_id: label_data}}
self.door_labels = {} # @door_labels - {uid: {door_label_id: door_label_data}}
self.machinings = {} # @machinings - {uid: [machining_list]}
self.dimensions = {} # @dimensions - {uid: [dimension_list]}
# 单元参数 - 对应Ruby的@unit_param和@unit_trans
self.unit_params = {} # @unit_param - {uid: params}
self.unit_trans = {} # @unit_trans - {uid: transformation}
# 材质和纹理 - 对应Ruby的@textures
self.textures = {} # @textures - {ckey: material}
# 系统状态
self.part_mode = False # @part_mode
self.hide_none = False # @hide_none
self.mat_type = 0 # @mat_type (MAT_TYPE_NORMAL)
self.back_material = False # @back_material
self.added_contour = False # @added_contour
# 选择状态 - 对应Ruby的类变量
self.selected_uid = None # @@selected_uid
self.selected_obj = None # @@selected_obj
self.selected_zone = None # @@selected_zone
self.selected_part = None # @@selected_part
self.scaled_zone = None # @@scaled_zone
# 选择集合
self.selected_faces = [] # @selected_faces
self.selected_parts = [] # @selected_parts
self.selected_hws = [] # @selected_hws
# Blender对象引用
self.labels_group = None # @labels - Blender组对象
self.door_labels_group = None # @door_labels - Blender组对象
self.door_layer = None # @door_layer
self.drawer_layer = None # @drawer_layer
self.default_zone = None # @@default_zone
# 线程安全
self.lock = threading.Lock()
logger.info("✅ 数据管理器初始化完成")
# ==================== Zones 数据管理 ====================
def get_zones(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取区域数据 - 对应Ruby的get_zones方法"""
uid = data.get("uid")
if not uid:
return {}
with self.lock:
if uid not in self.zones:
self.zones[uid] = {}
return self.zones[uid]
def add_zone(self, uid: str, zid: Any, zone_obj: Any):
"""添加区域"""
with self.lock:
if uid not in self.zones:
self.zones[uid] = {}
self.zones[uid][zid] = zone_obj
logger.debug(f"添加区域: uid={uid}, zid={zid}")
def remove_zone(self, uid: str, zid: Any) -> bool:
"""删除区域"""
with self.lock:
if uid in self.zones and zid in self.zones[uid]:
del self.zones[uid][zid]
logger.debug(f"删除区域: uid={uid}, zid={zid}")
return True
return False
def clear_zones(self, uid: str):
"""清空指定uid的所有区域"""
with self.lock:
if uid in self.zones:
del self.zones[uid]
logger.debug(f"清空区域: uid={uid}")
# ==================== Parts 数据管理 ====================
def get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取部件数据 - 对应Ruby的get_parts方法"""
uid = data.get("uid")
if not uid:
return {}
with self.lock:
if uid not in self.parts:
self.parts[uid] = {}
return self.parts[uid]
def add_part(self, uid: str, part_id: Any, part_obj: Any):
"""添加部件"""
with self.lock:
if uid not in self.parts:
self.parts[uid] = {}
self.parts[uid][part_id] = part_obj
logger.debug(f"添加部件: uid={uid}, part_id={part_id}")
def remove_part(self, uid: str, part_id: Any) -> bool:
"""删除部件"""
with self.lock:
if uid in self.parts and part_id in self.parts[uid]:
del self.parts[uid][part_id]
logger.debug(f"删除部件: uid={uid}, part_id={part_id}")
return True
return False
def clear_parts(self, uid: str):
"""清空指定uid的所有部件"""
with self.lock:
if uid in self.parts:
del self.parts[uid]
logger.debug(f"清空部件: uid={uid}")
# ==================== Hardwares 数据管理 ====================
def get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取硬件数据 - 对应Ruby的get_hardwares方法"""
uid = data.get("uid")
if not uid:
return {}
with self.lock:
if uid not in self.hardwares:
self.hardwares[uid] = {}
return self.hardwares[uid]
def add_hardware(self, uid: str, hw_id: Any, hw_obj: Any):
"""添加硬件"""
with self.lock:
if uid not in self.hardwares:
self.hardwares[uid] = {}
self.hardwares[uid][hw_id] = hw_obj
logger.debug(f"添加硬件: uid={uid}, hw_id={hw_id}")
def remove_hardware(self, uid: str, hw_id: Any) -> bool:
"""删除硬件"""
with self.lock:
if uid in self.hardwares and hw_id in self.hardwares[uid]:
del self.hardwares[uid][hw_id]
logger.debug(f"删除硬件: uid={uid}, hw_id={hw_id}")
return True
return False
def clear_hardwares(self, uid: str):
"""清空指定uid的所有硬件"""
with self.lock:
if uid in self.hardwares:
del self.hardwares[uid]
logger.debug(f"清空硬件: uid={uid}")
# ==================== Labels 数据管理 ====================
def get_labels(self, uid: str) -> Dict[str, Any]:
"""获取标签数据"""
with self.lock:
if uid not in self.labels:
self.labels[uid] = {}
return self.labels[uid]
def get_door_labels(self, uid: str) -> Dict[str, Any]:
"""获取门标签数据"""
with self.lock:
if uid not in self.door_labels:
self.door_labels[uid] = {}
return self.door_labels[uid]
# ==================== Machinings 数据管理 ====================
def get_machinings(self, uid: str) -> List[Any]:
"""获取加工数据"""
with self.lock:
if uid not in self.machinings:
self.machinings[uid] = []
return self.machinings[uid]
def add_machining(self, uid: str, machining_obj: Any):
"""添加加工"""
with self.lock:
if uid not in self.machinings:
self.machinings[uid] = []
self.machinings[uid].append(machining_obj)
logger.debug(f"添加加工: uid={uid}")
def clear_machinings(self, uid: str):
"""清空指定uid的所有加工"""
with self.lock:
if uid in self.machinings:
self.machinings[uid].clear()
logger.debug(f"清空加工: uid={uid}")
def cleanup_machinings(self, uid: str):
"""清理指定uid的已删除加工对象"""
with self.lock:
if uid in self.machinings:
# 移除已删除的对象
self.machinings[uid] = [
machining for machining in self.machinings[uid]
if machining and self._is_entity_valid(machining)
]
logger.debug(
f"清理加工: uid={uid}, 剩余 {len(self.machinings[uid])} 个对象")
# ==================== Dimensions 数据管理 ====================
def get_dimensions(self, uid: str) -> List[Any]:
"""获取尺寸标注数据"""
with self.lock:
if uid not in self.dimensions:
self.dimensions[uid] = []
return self.dimensions[uid]
def add_dimension(self, uid: str, dimension_obj: Any):
"""添加尺寸标注"""
with self.lock:
if uid not in self.dimensions:
self.dimensions[uid] = []
self.dimensions[uid].append(dimension_obj)
logger.debug(f"添加尺寸标注: uid={uid}")
def clear_dimensions(self, uid: str):
"""清空指定uid的所有尺寸标注"""
with self.lock:
if uid in self.dimensions:
del self.dimensions[uid]
logger.debug(f"清空尺寸标注: uid={uid}")
# ==================== 材质管理 ====================
def get_texture(self, key: str) -> Any:
"""获取材质 - 对应Ruby的get_texture方法"""
if key and key in self.textures:
return self.textures[key]
return self.textures.get("mat_default")
def add_texture(self, key: str, material: Any):
"""添加材质"""
with self.lock:
self.textures[key] = material
logger.debug(f"添加材质: key={key}")
# ==================== 选择管理 ====================
def sel_clear(self):
"""清除所有选择 - 对应Ruby的sel_clear方法"""
with self.lock:
self.selected_uid = None
self.selected_obj = None
self.selected_zone = None
self.selected_part = None
self.selected_faces.clear()
self.selected_parts.clear()
self.selected_hws.clear()
logger.debug("清除所有选择")
def set_selected(self, uid: str, obj: Any, zone: Any = None, part: Any = None):
"""设置选择状态"""
with self.lock:
self.selected_uid = uid
self.selected_obj = obj
if zone:
self.selected_zone = zone
if part:
self.selected_part = part
logger.debug(f"设置选择: uid={uid}, obj={obj}")
# ==================== 删除实体的核心实现 ====================
def del_entities_by_type(self, entities: Dict[str, Any], typ: str, oid: int) -> int:
"""按类型删除实体 - 修复版本,确保删除所有相关对象"""
if not entities:
return 0
deleted_count = 0
entities_to_delete = []
# 【修复1】收集数据结构中需要删除的实体
for key, entity in entities.items():
if entity and self._is_entity_valid(entity):
# 对应Ruby逻辑: typ == "uid" || entity.get_attribute("sw", typ) == oid
if typ == "uid" or self._get_entity_attribute(entity, typ) == oid:
entities_to_delete.append(key)
# 【修复2】删除数据结构中的实体
for key in entities_to_delete:
entity = entities[key]
if self._delete_entity_safe(entity):
del entities[key]
deleted_count += 1
# 【修复3】遍历Blender中的所有对象查找并删除相关对象
if BLENDER_AVAILABLE:
blender_deleted_count = self._delete_blender_objects_by_type(
typ, oid)
deleted_count += blender_deleted_count
logger.debug(
f"从Blender中删除对象: typ={typ}, oid={oid}, 删除数量={blender_deleted_count}")
logger.debug(f"按类型删除实体: typ={typ}, oid={oid}, 总删除数量={deleted_count}")
return deleted_count
def _matches_delete_condition(self, entity, typ: str, oid: int, uid: str = None) -> bool:
"""检查实体是否匹配删除条件 - 添加详细调试"""
try:
if not entity or not hasattr(entity, 'get'):
return False
# 【调试】打印实体的所有属性
entity_uid = entity.get("sw_uid")
entity_typ_value = entity.get(f"sw_{typ}")
logger.debug(
f"🔍 检查删除条件: {entity.name if hasattr(entity, 'name') else 'unknown'}")
logger.debug(
f" 实体属性: sw_uid={entity_uid}, sw_{typ}={entity_typ_value}")
logger.debug(f" 删除条件: uid={uid}, typ={typ}, oid={oid}")
# 【修复】正确的删除条件逻辑
if typ == "uid":
# 删除整个单元检查sw_uid
uid_matches = entity_uid == oid
logger.debug(f" uid删除匹配: {uid_matches}")
return uid_matches
else:
# 删除特定类型需要同时匹配uid和对应的类型属性
uid_matches = uid is None or entity_uid == uid
typ_matches = entity_typ_value == oid
logger.debug(
f" 类型删除匹配: uid匹配={uid_matches}, {typ}匹配={typ_matches}")
# 必须同时匹配uid和类型值
return uid_matches and typ_matches
except Exception as e:
logger.error(f"检查删除条件时发生错误: {e}")
return False
def _delete_blender_objects_by_type(self, typ: str, oid: int, uid: str = None) -> int:
"""从Blender中删除指定类型的对象 - 添加详细调试"""
deleted_count = 0
try:
logger.info(f" 开始搜索Blender对象: typ={typ}, oid={oid}, uid={uid}")
# 遍历所有Blender对象
objects_to_delete = []
checked_objects = []
for obj in bpy.data.objects:
checked_objects.append(obj.name)
if self._should_delete_blender_object(obj, typ, oid, uid):
objects_to_delete.append(obj)
logger.info(f"🎯 标记删除: {obj.name}")
logger.info(
f"📊 检查了 {len(checked_objects)} 个对象,标记删除 {len(objects_to_delete)}")
# 删除收集到的对象
for obj in objects_to_delete:
try:
logger.info(
f" 删除Blender对象: {obj.name}, typ={typ}, oid={oid}, uid={uid}")
bpy.data.objects.remove(obj, do_unlink=True)
deleted_count += 1
except Exception as e:
logger.error(f"删除Blender对象失败: {obj.name}, 错误: {e}")
# 清理孤立的网格数据
self._cleanup_orphaned_meshes()
except Exception as e:
logger.error(f"删除Blender对象时发生错误: {e}")
return deleted_count
def _should_delete_blender_object(self, obj, typ: str, oid: int, uid: str = None) -> bool:
"""判断是否应该删除Blender对象 - 添加详细调试"""
try:
if not obj or not hasattr(obj, 'get'):
return False
# 【调试】打印对象的所有sw_属性
sw_attrs = {}
for key, value in obj.items():
if key.startswith('sw_'):
sw_attrs[key] = value
logger.debug(f"🔍 检查对象 {obj.name} 的sw_属性: {sw_attrs}")
# 使用相同的删除条件逻辑
should_delete = self._matches_delete_condition(obj, typ, oid, uid)
if should_delete:
logger.info(f"✅ 对象 {obj.name} 匹配删除条件")
else:
logger.debug(f"❌ 对象 {obj.name} 不匹配删除条件")
return should_delete
except Exception as e:
logger.error(f"检查Blender对象删除条件时发生错误: {e}")
return False
def _cleanup_orphaned_meshes(self):
"""清理孤立的网格数据"""
try:
# 清理没有对象的网格
for mesh in bpy.data.meshes:
if mesh.users == 0:
bpy.data.meshes.remove(mesh)
logger.debug(f"清理孤立网格: {mesh.name}")
# 清理没有对象的材质
for material in bpy.data.materials:
if material.users == 0:
bpy.data.materials.remove(material)
logger.debug(f"清理孤立材质: {material.name}")
except Exception as e:
logger.error(f"清理孤立数据时发生错误: {e}")
def _get_entity_attribute(self, entity, attr_name: str):
"""获取实体属性 - 改进版本"""
try:
if BLENDER_AVAILABLE and hasattr(entity, 'get'):
# 【修复7】检查多种可能的属性名
possible_attrs = [
f"sw_{attr_name}",
attr_name,
f"sw{attr_name}"
]
for attr in possible_attrs:
value = entity.get(attr)
if value is not None:
return value
return None
except:
return None
def _delete_entity_safe(self, entity) -> bool:
"""安全删除实体 - 改进版本"""
try:
if not entity or not BLENDER_AVAILABLE:
return False
# 【修复8】更全面的对象检查
if hasattr(entity, 'name'):
# 检查是否在Blender对象中
if entity.name in bpy.data.objects:
bpy.data.objects.remove(entity, do_unlink=True)
return True
# 检查是否在网格中
elif entity.name in bpy.data.meshes:
bpy.data.meshes.remove(entity, do_unlink=True)
return True
# 检查是否在材质中
elif entity.name in bpy.data.materials:
bpy.data.materials.remove(entity, do_unlink=True)
return True
return False
except Exception as e:
logger.error(f"删除实体失败: {e}")
return False
def _is_entity_valid(self, entity) -> bool:
"""检查实体是否有效"""
try:
if entity is None:
return False
# 检查是否是 Blender 对象
if hasattr(entity, 'name'):
# 检查对象是否已被删除
if hasattr(entity, 'is_valid'):
return entity.is_valid
elif hasattr(entity, 'users'):
return entity.users > 0
else:
return True
# 检查是否是字典或其他类型
return entity is not None
except Exception:
return False
# ==================== 统计和管理方法 ====================
def get_data_stats(self) -> Dict[str, Any]:
"""获取数据统计信息"""
with self.lock:
return {
"total_units": len(set(list(self.zones.keys()) + list(self.parts.keys()) + list(self.hardwares.keys()))),
"zones": {
"units": len(self.zones),
"total_zones": sum(len(zones) for zones in self.zones.values())
},
"parts": {
"units": len(self.parts),
"total_parts": sum(len(parts) for parts in self.parts.values())
},
"hardwares": {
"units": len(self.hardwares),
"total_hardwares": sum(len(hws) for hws in self.hardwares.values())
},
"machinings": {
"units": len(self.machinings),
"total_machinings": sum(len(machs) for machs in self.machinings.values())
},
"dimensions": {
"units": len(self.dimensions),
"total_dimensions": sum(len(dims) for dims in self.dimensions.values())
},
"textures": len(self.textures),
"selected_objects": {
"uid": self.selected_uid,
"obj": self.selected_obj,
"faces": len(self.selected_faces),
"parts": len(self.selected_parts),
"hardwares": len(self.selected_hws)
},
"system_state": {
"part_mode": self.part_mode,
"hide_none": self.hide_none,
"mat_type": self.mat_type,
"back_material": self.back_material,
"added_contour": self.added_contour
}
}
def cleanup_all(self):
"""清理所有数据"""
with self.lock:
self.zones.clear()
self.parts.clear()
self.hardwares.clear()
self.labels.clear()
self.door_labels.clear()
self.machinings.clear()
self.dimensions.clear()
self.textures.clear()
self.unit_params.clear()
self.unit_trans.clear()
self.sel_clear()
logger.info("✅ 数据管理器清理完成")
# ==================== 全局数据管理器实例 ====================
# 全局数据管理器实例
data_manager: Optional[DataManager] = None
def init_data_manager() -> DataManager:
"""初始化全局数据管理器实例"""
global data_manager
if data_manager is None:
data_manager = DataManager()
return data_manager
def get_data_manager() -> DataManager:
"""获取全局数据管理器实例"""
global data_manager
if data_manager is None:
data_manager = init_data_manager()
return data_manager
# 自动初始化
data_manager = init_data_manager()
# ==================== 兼容性函数 ====================
def get_zones(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取zones"""
return data_manager.get_zones(data)
def get_parts(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取parts"""
return data_manager.get_parts(data)
def get_hardwares(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取hardwares"""
return data_manager.get_hardwares(data)
def get_texture(key: str) -> Any:
"""兼容性函数 - 获取材质"""
return data_manager.get_texture(key)
def sel_clear():
"""兼容性函数 - 清除选择"""
return data_manager.sel_clear()
# ==================== 兼容性函数 ====================
def get_zones(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取zones"""
return data_manager.get_zones(data)
def get_parts(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取parts"""
return data_manager.get_parts(data)
def get_hardwares(data: Dict[str, Any]) -> Dict[str, Any]:
"""兼容性函数 - 获取hardwares"""
return data_manager.get_hardwares(data)
def get_texture(key: str) -> Any:
"""兼容性函数 - 获取材质"""
return data_manager.get_texture(key)
def sel_clear():
"""兼容性函数 - 清除选择"""
return data_manager.sel_clear()