#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SUW Implementation - Python翻译版本 原文件: SUWImpl.rb (2019行) 用途: 核心实现类,SUWood的主要功能 翻译进度: Phase 1 - 几何类和基础框架 内存管理优化: 应用 Blender Python API 最佳实践 """ import re import math import logging import time import gc import weakref from typing import Optional, Any, Dict, List, Tuple, Union from contextlib import contextmanager # 设置日志 logger = logging.getLogger(__name__) # 尝试相对导入,失败则使用绝对导入 try: from .suw_constants import SUWood except ImportError: try: from suw_constants import SUWood except ImportError: # 如果都找不到,创建一个基本的存根 class SUWood: @staticmethod def suwood_path(version): return "." try: import bpy import mathutils import bmesh BLENDER_AVAILABLE = True except ImportError: BLENDER_AVAILABLE = False print("⚠️ Blender API 不可用,使用基础几何类") # 创建存根mathutils模块 class MockMathutils: class Vector: def __init__(self, vec): self.x, self.y, self.z = vec[:3] if len( vec) >= 3 else (vec + [0, 0])[:3] def normalized(self): return self def dot(self, other): return 0 class Matrix: @staticmethod def Scale(scale, size, axis): return MockMathutils.Matrix() @staticmethod def Translation(vec): return MockMathutils.Matrix() @staticmethod def Rotation(angle, size): return MockMathutils.Matrix() def __matmul__(self, other): return MockMathutils.Matrix() mathutils = MockMathutils() # ==================== 内存管理核心类 ==================== class BlenderMemoryManager: """Blender 内存管理器 - 应用最佳实践""" def __init__(self): self.object_registry = weakref.WeakSet() self.mesh_registry = weakref.WeakSet() self.material_registry = weakref.WeakSet() self.creation_stats = { 'objects_created': 0, 'meshes_created': 0, 'materials_created': 0, 'cleanup_calls': 0, 'failed_cleanups': 0 } self.last_cleanup_time = time.time() self.cleanup_interval = 300 # 5分钟 def register_object(self, obj): """注册对象到弱引用集合""" if obj: self.object_registry.add(obj) self.creation_stats['objects_created'] += 1 def register_mesh(self, mesh): """注册网格到弱引用集合""" if mesh: self.mesh_registry.add(mesh) self.creation_stats['meshes_created'] += 1 def should_cleanup(self): """检查是否应该执行清理""" return time.time() - self.last_cleanup_time > self.cleanup_interval def cleanup_orphaned_data(self): """清理孤立数据""" if not BLENDER_AVAILABLE: return 0 cleanup_count = 0 try: # 清理孤立网格 for mesh in list(bpy.data.meshes): if mesh.users == 0: try: bpy.data.meshes.remove(mesh, do_unlink=True) cleanup_count += 1 except: pass # 清理孤立材质 for material in list(bpy.data.materials): if material.users == 0: try: bpy.data.materials.remove(material, do_unlink=True) cleanup_count += 1 except: pass # 清理孤立纹理 for texture in list(bpy.data.textures): if texture.users == 0: try: bpy.data.textures.remove(texture, do_unlink=True) cleanup_count += 1 except: pass # 更新清理时间 self.last_cleanup_time = time.time() self.creation_stats['cleanup_calls'] += 1 logger.info(f"清理了 {cleanup_count} 个孤立数据块") return cleanup_count except Exception as e: logger.error(f"清理孤立数据失败: {e}") self.creation_stats['failed_cleanups'] += 1 return 0 # 全局内存管理器实例 memory_manager = BlenderMemoryManager() @contextmanager def safe_blender_operation(operation_name: str): """ 安全的 Blender 操作上下文管理器 使用方法: with safe_blender_operation("创建网格"): # 执行 Blender 操作 pass """ logger.info(f"开始操作: {operation_name}") start_time = time.time() # 记录初始状态 initial_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0 initial_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0 try: yield except Exception as e: logger.error(f"操作失败 {operation_name}: {e}") # 尝试清理 try: if memory_manager.should_cleanup(): memory_manager.cleanup_orphaned_data() gc.collect() except: pass raise finally: # 记录最终状态 final_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0 final_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0 elapsed_time = time.time() - start_time logger.info(f"操作完成: {operation_name}") logger.info(f"耗时: {elapsed_time:.2f}秒") logger.info(f"对象变化: {final_objects - initial_objects}") logger.info(f"网格变化: {final_meshes - initial_meshes}") # ==================== 几何类扩展 ==================== class Point3d: """3D点类 - 对应Ruby的Geom::Point3d""" def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0): self.x = x self.y = y self.z = z @classmethod def parse(cls, value: str): """从字符串解析3D点""" if not value or value.strip() == "": return None # 解析格式: "(x,y,z)" 或 "x,y,z" clean_value = re.sub(r'[()]*', '', value) xyz = [float(axis.strip()) for axis in clean_value.split(',')] # 转换mm为内部单位(假设输入是mm) return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001) def to_s(self, unit: str = "mm", digits: int = -1) -> str: """转换为字符串""" if unit == "cm": x_val = self.x * 100 # 内部单位转换为cm y_val = self.y * 100 z_val = self.z * 100 return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})" else: # mm x_val = self.x * 1000 # 内部单位转换为mm y_val = self.y * 1000 z_val = self.z * 1000 if digits == -1: return f"({x_val}, {y_val}, {z_val})" else: return f"({x_val:.{digits}f}, {y_val:.{digits}f}, {z_val:.{digits}f})" def __str__(self): return self.to_s() def __repr__(self): return f"Point3d({self.x}, {self.y}, {self.z})" class Vector3d: """3D向量类 - 对应Ruby的Geom::Vector3d""" def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0): self.x = x self.y = y self.z = z @classmethod def parse(cls, value: str): """从字符串解析3D向量""" if not value or value.strip() == "": return None clean_value = re.sub(r'[()]*', '', value) xyz = [float(axis.strip()) for axis in clean_value.split(',')] return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001) def to_s(self, unit: str = "mm") -> str: """转换为字符串""" if unit == "cm": x_val = self.x * 100 # 内部单位转换为cm y_val = self.y * 100 z_val = self.z * 100 return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})" elif unit == "in": return f"({self.x}, {self.y}, {self.z})" else: # mm x_val = self.x * 1000 # 内部单位转换为mm y_val = self.y * 1000 z_val = self.z * 1000 return f"({x_val}, {y_val}, {z_val})" def normalize(self): """归一化向量""" length = math.sqrt(self.x**2 + self.y**2 + self.z**2) if length > 0: return Vector3d(self.x/length, self.y/length, self.z/length) return Vector3d(0, 0, 0) def __str__(self): return self.to_s() class Transformation: """变换矩阵类 - 对应Ruby的Geom::Transformation""" def __init__(self, origin: Point3d = None, x_axis: Vector3d = None, y_axis: Vector3d = None, z_axis: Vector3d = None): self.origin = origin or Point3d(0, 0, 0) self.x_axis = x_axis or Vector3d(1, 0, 0) self.y_axis = y_axis or Vector3d(0, 1, 0) self.z_axis = z_axis or Vector3d(0, 0, 1) @classmethod def parse(cls, data: Dict[str, str]): """从字典解析变换""" origin = Point3d.parse(data.get("o")) x_axis = Vector3d.parse(data.get("x")) y_axis = Vector3d.parse(data.get("y")) z_axis = Vector3d.parse(data.get("z")) return cls(origin, x_axis, y_axis, z_axis) def store(self, data: Dict[str, str]): """存储变换到字典""" data["o"] = self.origin.to_s("mm") data["x"] = self.x_axis.to_s("in") data["y"] = self.y_axis.to_s("in") data["z"] = self.z_axis.to_s("in") # ==================== SUWood 材质类型常量 ==================== MAT_TYPE_NORMAL = 0 MAT_TYPE_OBVERSE = 1 MAT_TYPE_NATURE = 2 # ==================== SUWImpl 核心实现类 ==================== class SUWImpl: """SUWood核心实现类 - 完整翻译版本,应用内存管理最佳实践""" _instance = None _selected_uid = None _selected_obj = None _selected_zone = None _selected_part = None _scaled_zone = None _server_path = None _default_zone = None _creation_lock = False _mesh_creation_count = 0 _batch_operation_active = False def __init__(self): """初始化SUWImpl实例""" # 基础属性 self.parts = {} self.zones = {} self.hardwares = {} self.materials = {} self.textures = {} # 内存管理相关 self.object_references = {} # 存储对象名称而非引用 self.mesh_cache = {} self.material_cache = {} # 批量操作优化 self.deferred_updates = [] self.batch_size = 50 logger.info("SUWImpl 初始化完成,启用内存管理优化") @classmethod def get_instance(cls): """获取单例实例""" if cls._instance is None: cls._instance = cls() return cls._instance def startup(self): """启动SUWood系统""" logger.info("启动SUWood系统...") if BLENDER_AVAILABLE: try: self._create_layers() self._init_materials() self._init_default_zone() logger.info("✅ SUWood系统启动成功") except Exception as e: logger.error(f"❌ SUWood系统启动失败: {e}") raise else: logger.warning("⚠️ Blender不可用,使用基础模式") def _create_layers(self): """创建图层""" if not BLENDER_AVAILABLE: return try: # 创建必要的集合 collections = ["SUWood_Parts", "SUWood_Hardware", "SUWood_Zones"] for collection_name in collections: if collection_name not in bpy.data.collections: collection = bpy.data.collections.new(collection_name) bpy.context.scene.collection.children.link(collection) logger.info(f"创建集合: {collection_name}") except Exception as e: logger.error(f"创建图层失败: {e}") def _init_materials(self): """初始化材质""" if not BLENDER_AVAILABLE: return try: # 创建默认材质 default_materials = [ ("SUWood_Default", (0.8, 0.8, 0.8, 1.0)), ("SUWood_Wood", (0.6, 0.4, 0.2, 1.0)), ("SUWood_Metal", (0.7, 0.7, 0.8, 1.0)), ] for mat_name, color in default_materials: if mat_name not in bpy.data.materials: material = bpy.data.materials.new(mat_name) material.use_nodes = True if material.node_tree: principled = material.node_tree.nodes.get( "Principled BSDF") if principled: principled.inputs[0].default_value = color self.material_cache[mat_name] = material.name # 存储名称而非引用 memory_manager.register_object(material) logger.info(f"创建材质: {mat_name}") except Exception as e: logger.error(f"初始化材质失败: {e}") def _init_default_zone(self): """初始化默认区域""" try: self._default_zone = { "name": "Default Zone", "id": "default_zone_001", "visible": True, "parts": {}, "hardwares": {} } self.zones["default"] = self._default_zone logger.info("初始化默认区域") except Exception as e: logger.error(f"初始化默认区域失败: {e}") # ==================== 内存管理优化的核心方法 ==================== def validate_mesh_data(self, vertices: List, faces: List) -> bool: """验证网格数据有效性""" try: # 检查顶点数量 if len(vertices) < 3: logger.warning("顶点数量不足") return False # 检查面数据 for face in faces: if len(face) < 3: logger.warning("面顶点数量不足") return False # 检查顶点索引有效性 for vertex_index in face: if vertex_index >= len(vertices) or vertex_index < 0: logger.warning(f"无效的顶点索引: {vertex_index}") return False return True except Exception as e: logger.error(f"网格数据验证失败: {e}") return False def validate_coplanar_points(self, points: List[Tuple[float, float, float]]) -> bool: """验证点是否共面""" if len(points) < 4: return True # 3个点总是共面的 try: # 使用前三个点定义平面 p1 = mathutils.Vector(points[0]) p2 = mathutils.Vector(points[1]) p3 = mathutils.Vector(points[2]) # 计算平面法向量 normal = (p2 - p1).cross(p3 - p1).normalized() # 检查其他点是否在同一平面上 tolerance = 1e-6 for point in points[3:]: p = mathutils.Vector(point) distance = abs((p - p1).dot(normal)) if distance > tolerance: return False return True except Exception as e: logger.error(f"共面检查失败: {e}") return False @contextmanager def atomic_mesh_creation(self, name: str): """原子网格创建上下文管理器""" mesh = None obj = None try: # 第1步:创建网格数据 mesh = bpy.data.meshes.new(name) memory_manager.register_mesh(mesh) yield mesh except Exception as e: logger.error(f"原子网格创建失败: {e}") # 自动清理 if obj: self._cleanup_object_safe(obj) if mesh: self._cleanup_mesh_safe(mesh) raise def _cleanup_mesh_safe(self, mesh): """安全清理网格""" try: if mesh and mesh.name in bpy.data.meshes: bpy.data.meshes.remove(mesh, do_unlink=True) memory_manager.creation_stats['cleanup_calls'] += 1 logger.info(f"清理网格: {mesh.name}") except Exception as e: logger.error(f"网格清理失败: {e}") memory_manager.creation_stats['failed_cleanups'] += 1 def _cleanup_object_safe(self, obj): """安全清理对象""" try: if obj and obj.name in bpy.data.objects: # 从所有集合中移除 for collection in obj.users_collection: collection.objects.unlink(obj) # 删除对象 bpy.data.objects.remove(obj, do_unlink=True) memory_manager.creation_stats['cleanup_calls'] += 1 logger.info(f"清理对象: {obj.name}") except Exception as e: logger.error(f"对象清理失败: {e}") memory_manager.creation_stats['failed_cleanups'] += 1 def _create_single_mesh_atomic(self, container: Any, surface: Dict[str, Any], color: str = None, scale: float = None, angle: float = None, series: List = None, uid: str = None, zid: Any = None, pid: Any = None, child: Any = None, options: Dict = None) -> Any: """优化的原子性mesh创建""" with safe_blender_operation(f"创建网格 uid={uid}"): try: # 增加创建计数 self.__class__._mesh_creation_count += 1 creation_count = self.__class__._mesh_creation_count # 定期清理 if creation_count % 100 == 0: logger.info(f"执行定期清理,当前创建数: {creation_count}") memory_manager.cleanup_orphaned_data() gc.collect() # 数据验证 segs = surface.get("segs", []) points = [] for seg in segs: if isinstance(seg, list) and len(seg) >= 1: point_str = seg[0] if isinstance( seg[0], str) else str(seg[0]) try: point = Point3d.parse(point_str) if point: points.append(point) except Exception as e: logger.warning(f"点解析失败: {point_str}") if len(points) < 3: logger.warning(f"有效点数不足: {len(points)}") return None # 面积验证 area = self._calculate_face_area(points) if area < 1e-6: logger.warning(f"面积过小: {area}") return None # 验证几何数据 vertices = [(float(p.x), float(p.y), float(p.z)) for p in points] faces = [list(range(len(vertices)))] if len( vertices) >= 3 else [] if not self.validate_mesh_data(vertices, faces): logger.warning("网格数据验证失败") return None # 原子性创建 timestamp = hash( str(points[0]) + str(uid) + str(child) + str(creation_count)) mesh_name = f"SUWMesh_{abs(timestamp) % 10000}" with self.atomic_mesh_creation(mesh_name) as mesh: # 设置几何数据 mesh.from_pydata(vertices, [], faces) # 创建UV坐标 if len(vertices) >= 3: try: uv_layer = mesh.uv_layers.new(name="UVMap") self._generate_uv_coordinates(mesh, uv_layer) except Exception as e: logger.warning(f"UV坐标创建失败: {e}") # 安全更新网格 if not self._safe_mesh_update(mesh): raise RuntimeError("网格更新失败") # 创建对象 obj_name = f"SUWObj_{abs(timestamp) % 10000}" obj = bpy.data.objects.new(obj_name, mesh) memory_manager.register_object(obj) # 添加到集合 if hasattr(container, 'objects'): container.objects.link(obj) else: bpy.context.scene.collection.objects.link(obj) # 设置属性 self._set_object_attributes_safe( obj, uid, zid, pid, child, area, creation_count) # 应用材质 if color: self._apply_material_safe(obj, color) # 存储对象引用(名称而非对象) self.object_references[obj_name] = { 'uid': uid, 'type': 'mesh', 'creation_time': time.time() } logger.info(f"✅ 网格创建成功: {obj.name}, {len(vertices)}顶点") return obj except Exception as e: logger.error(f"网格创建失败: {e}") return None def _generate_uv_coordinates(self, mesh, uv_layer): """生成UV坐标""" try: for poly in mesh.polygons: for i, loop_index in enumerate(poly.loop_indices): vertex_count = len(poly.vertices) if vertex_count == 4: # 四边形 uv_coords = [(0, 0), (1, 0), (1, 1), (0, 1)] uv_layer.data[loop_index].uv = uv_coords[i % 4] elif vertex_count == 3: # 三角形 uv_coords = [(0, 0), (1, 0), (0.5, 1)] uv_layer.data[loop_index].uv = uv_coords[i % 3] else: # 其他多边形 angle = (i / vertex_count) * 2 * math.pi u = (math.cos(angle) + 1) * 0.5 v = (math.sin(angle) + 1) * 0.5 uv_layer.data[loop_index].uv = (u, v) except Exception as e: logger.error(f"UV坐标生成失败: {e}") def _safe_mesh_update(self, mesh, max_retries=3): """安全更新网格""" for attempt in range(max_retries): try: # 检查网格是否仍然有效 if not mesh or mesh.name not in bpy.data.meshes: return False # 更新网格 mesh.update() # 验证更新结果 if not mesh.vertices: raise RuntimeError("网格更新后无顶点") return True except Exception as e: logger.warning(f"网格更新失败,尝试 {attempt + 1}/{max_retries}: {e}") if attempt < max_retries - 1: time.sleep(0.1) # 短暂等待 else: return False return False def _set_object_attributes_safe(self, obj, uid, zid, pid, child, area, creation_count): """安全设置对象属性""" try: obj["uid"] = str(uid) if uid else "unknown" obj["zid"] = str(zid) if zid else "unknown" obj["pid"] = str(pid) if pid else "unknown" obj["child"] = str(child) if child else "unknown" obj["area"] = float(area) if area else 0.0 obj["creation_index"] = creation_count obj["creation_time"] = time.time() obj["memory_managed"] = True except Exception as e: logger.error(f"设置对象属性失败: {e}") def _apply_material_safe(self, obj, color): """安全应用材质""" try: material = self.get_texture(color) if material: if obj.data.materials: obj.data.materials[0] = material else: obj.data.materials.append(material) logger.info(f"材质应用成功: {color}") else: self._apply_simple_material(obj, color) except Exception as e: logger.error(f"材质应用失败: {e}") try: self._apply_simple_material(obj, color) except: pass def _apply_simple_material(self, obj: Any, color: str): """应用简单材质""" try: # 创建或获取缓存的材质 mat_name = f"SUWMat_{color}" if color else "SUWMat_default" if mat_name in self.material_cache: material_name = self.material_cache[mat_name] if material_name in bpy.data.materials: material = bpy.data.materials[material_name] obj.data.materials.append(material) return # 创建新材质 material = bpy.data.materials.new(mat_name) material.use_nodes = True if material.node_tree and material.node_tree.nodes: principled = material.node_tree.nodes.get("Principled BSDF") if principled: if color and len(color) >= 6: try: # 解析颜色 r = int(color[0:2], 16) / 255.0 g = int(color[2:4], 16) / 255.0 b = int(color[4:6], 16) / 255.0 principled.inputs[0].default_value = (r, g, b, 1.0) except: principled.inputs[0].default_value = ( 0.8, 0.8, 0.8, 1.0) else: principled.inputs[0].default_value = ( 0.8, 0.8, 0.8, 1.0) # 缓存材质 self.material_cache[mat_name] = material.name memory_manager.register_object(material) # 应用材质 obj.data.materials.append(material) logger.info(f"简单材质创建并应用: {mat_name}") except Exception as e: logger.error(f"简单材质应用失败: {e}") # ==================== 优化的删除和清理方法 ==================== def _force_delete_objects_optimized(self, objects_to_delete: List[Any]): """优化的强制删除对象""" try: if not BLENDER_AVAILABLE: return logger.info(f"开始优化删除 {len(objects_to_delete)} 个对象") # 预处理:收集对象名称 object_names = [] for obj in objects_to_delete: if obj and hasattr(obj, 'name'): object_names.append(obj.name) # 批量删除 deleted_count = 0 for obj_name in object_names: try: if obj_name in bpy.data.objects: obj = bpy.data.objects[obj_name] # 从集合中移除 collections_to_unlink = list(obj.users_collection) for collection in collections_to_unlink: try: collection.objects.unlink(obj) except: pass # 删除对象 bpy.data.objects.remove(obj, do_unlink=True) deleted_count += 1 # 从引用中移除 if obj_name in self.object_references: del self.object_references[obj_name] except Exception as e: logger.error(f"删除对象失败 {obj_name}: {e}") # 更新依赖图 if deleted_count > 0: self._update_dependency_graph(full_update=True) logger.info(f"✅ 优化删除完成: {deleted_count} 个对象") except Exception as e: logger.error(f"优化删除失败: {e}") def _update_dependency_graph(self, full_update: bool = False): """更新Blender依赖图""" try: if not BLENDER_AVAILABLE: return if full_update: logger.info("执行全局依赖图更新") bpy.context.view_layer.update() bpy.context.evaluated_depsgraph_get().update() # 刷新视图 try: for area in bpy.context.screen.areas: if area.type in ['VIEW_3D', 'OUTLINER']: area.tag_redraw() except: pass logger.info("全局依赖图更新完成") else: # 快速更新 bpy.context.view_layer.update() except Exception as e: logger.error(f"依赖图更新失败: {e}") # ==================== 保持原有业务逻辑的方法 ==================== def get_zones(self, data: Dict[str, Any]) -> Dict[str, Any]: """获取区域信息""" return self.zones def get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]: """获取零件信息""" return self.parts def get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]: """获取硬件信息""" return self.hardwares def get_texture(self, key: str): """获取纹理材质""" if not BLENDER_AVAILABLE: return None try: # 从缓存中获取 if key in self.material_cache: material_name = self.material_cache[key] if material_name in bpy.data.materials: return bpy.data.materials[material_name] # 查找现有材质 for material in bpy.data.materials: if key in material.name: self.material_cache[key] = material.name return material return None except Exception as e: logger.error(f"获取纹理失败: {e}") return None def create_face(self, container: Any, surface: Dict[str, Any], color: str = None, scale: float = None, angle: float = None, series: List = None, uid: str = None, zid: Any = None, pid: Any = None, child: Any = None) -> Any: """创建面 - 使用优化的内存管理""" try: return self._create_single_mesh_atomic( container, surface, color, scale, angle, series, uid, zid, pid, child ) except Exception as e: logger.error(f"创建面失败: {e}") return None def _calculate_face_area(self, points: List[Point3d]) -> float: """计算面积""" try: if len(points) < 3: return 0.0 # 使用三角剖分计算面积 area = 0.0 for i in range(1, len(points) - 1): p1 = points[0] p2 = points[i] p3 = points[i + 1] # 计算三角形面积 v1 = Vector3d(p2.x - p1.x, p2.y - p1.y, p2.z - p1.z) v2 = Vector3d(p3.x - p1.x, p3.y - p1.y, p3.z - p1.z) # 叉积的模长的一半就是三角形面积 cross_x = v1.y * v2.z - v1.z * v2.y cross_y = v1.z * v2.x - v1.x * v2.z cross_z = v1.x * v2.y - v1.y * v2.x triangle_area = 0.5 * \ math.sqrt(cross_x**2 + cross_y**2 + cross_z**2) area += triangle_area return area except Exception as e: logger.error(f"计算面积失败: {e}") return 0.0 # ==================== 系统管理方法 ==================== def get_creation_stats(self) -> Dict[str, Any]: """获取创建统计信息""" try: stats = { "creation_count": self.__class__._mesh_creation_count, "object_references": len(self.object_references), "mesh_cache_size": len(self.mesh_cache), "material_cache_size": len(self.material_cache), "memory_manager_stats": memory_manager.creation_stats.copy(), "blender_available": BLENDER_AVAILABLE } if BLENDER_AVAILABLE: stats["total_objects"] = len(bpy.data.objects) stats["total_meshes"] = len(bpy.data.meshes) stats["total_materials"] = len(bpy.data.materials) return stats except Exception as e: logger.error(f"获取统计信息失败: {e}") return {"error": str(e)} def force_cleanup(self): """强制清理""" try: logger.info("开始强制清理") # 清理孤立数据 cleanup_count = memory_manager.cleanup_orphaned_data() # 清理缓存 self.mesh_cache.clear() # 清理过期的对象引用 current_time = time.time() expired_refs = [] for obj_name, ref_info in self.object_references.items(): if current_time - ref_info.get('creation_time', 0) > 3600: # 1小时 expired_refs.append(obj_name) for obj_name in expired_refs: del self.object_references[obj_name] # 强制垃圾回收 gc.collect() # 更新依赖图 if BLENDER_AVAILABLE: self._update_dependency_graph(full_update=True) logger.info( f"强制清理完成: 清理了 {cleanup_count} 个数据块,{len(expired_refs)} 个过期引用") except Exception as e: logger.error(f"强制清理失败: {e}") def diagnose_system_state(self): """诊断系统状态""" try: logger.info("=== 系统状态诊断 ===") # 内存使用情况 stats = self.get_creation_stats() for key, value in stats.items(): logger.info(f"{key}: {value}") # 检查潜在问题 issues = [] if BLENDER_AVAILABLE: # 检查孤立数据 orphaned_meshes = [m for m in bpy.data.meshes if m.users == 0] if orphaned_meshes: issues.append(f"发现 {len(orphaned_meshes)} 个孤立网格") # 检查空网格 empty_meshes = [m for m in bpy.data.meshes if not m.vertices] if empty_meshes: issues.append(f"发现 {len(empty_meshes)} 个空网格") # 检查总顶点数 total_vertices = sum(len(m.vertices) for m in bpy.data.meshes) if total_vertices > 1000000: # 100万顶点 issues.append(f"顶点数量过多: {total_vertices}") if issues: logger.warning("发现问题:") for issue in issues: logger.warning(f" - {issue}") else: logger.info("✅ 系统状态正常") return issues except Exception as e: logger.error(f"系统诊断失败: {e}") return [f"诊断失败: {e}"] # ==================== 其他必要的方法存根 ==================== def c09(self, data: Dict[str, Any]): """删除操作 - 使用优化的内存管理""" try: uid = data.get("uid") typ = data.get("typ", "uid") oid = data.get("oid") logger.info(f"执行删除操作: uid={uid}, typ={typ}, oid={oid}") # 查找要删除的对象 objects_to_delete = self._find_objects_for_deletion(uid, typ, oid) if objects_to_delete: # 使用优化的删除方法 self._force_delete_objects_optimized(objects_to_delete) # 清理数据 self._safe_data_cleanup(uid, typ, oid) except Exception as e: logger.error(f"删除操作失败: {e}") def _find_objects_for_deletion(self, uid: str, typ: str, oid: Any) -> List[Any]: """查找要删除的对象""" try: if not BLENDER_AVAILABLE: return [] objects_to_delete = [] # 根据对象引用查找 for obj_name, ref_info in self.object_references.items(): if ref_info.get('uid') == uid: if obj_name in bpy.data.objects: objects_to_delete.append(bpy.data.objects[obj_name]) # 根据对象属性查找 for obj in bpy.data.objects: if obj.get("uid") == uid: objects_to_delete.append(obj) logger.info(f"找到 {len(objects_to_delete)} 个要删除的对象") return objects_to_delete except Exception as e: logger.error(f"查找删除对象失败: {e}") return [] def _safe_data_cleanup(self, uid: str, typ: str, oid: Any): """安全的数据清理""" try: cleanup_count = 0 # 清理内部数据结构 data_fields = [self.parts, self.zones, self.hardwares] for data_dict in data_fields: if uid in data_dict: del data_dict[uid] cleanup_count += 1 # 清理对象引用 refs_to_remove = [] for obj_name, ref_info in self.object_references.items(): if ref_info.get('uid') == uid: refs_to_remove.append(obj_name) for obj_name in refs_to_remove: del self.object_references[obj_name] cleanup_count += 1 logger.info(f"数据清理完成: {cleanup_count} 项") except Exception as e: logger.error(f"数据清理失败: {e}") # ==================== 其他业务逻辑方法的存根 ==================== def c00(self, data: Dict[str, Any]): """业务方法存根""" pass def c01(self, data: Dict[str, Any]): """创建区域""" try: uid = data.get("uid") zone_data = data.get("zone", {}) with safe_blender_operation(f"创建区域 uid={uid}"): # 创建区域集合 collection_name = f"Zone_{uid}" if BLENDER_AVAILABLE and collection_name not in bpy.data.collections: collection = bpy.data.collections.new(collection_name) bpy.context.scene.collection.children.link(collection) memory_manager.register_object(collection) # 存储区域数据 self.zones[uid] = { "collection_name": collection_name, "data": zone_data, "parts": {}, "creation_time": time.time() } logger.info(f"创建区域成功: {uid}") except Exception as e: logger.error(f"创建区域失败: {e}") def c02(self, data: Dict[str, Any]): """创建零件""" try: uid = data.get("uid") parts_data = data.get("parts", {}) with safe_blender_operation(f"创建零件 uid={uid}"): # 批量创建零件 created_parts = {} for part_id, part_data in parts_data.items(): try: # 创建零件集合 collection_name = f"Part_{uid}_{part_id}" if BLENDER_AVAILABLE and collection_name not in bpy.data.collections: collection = bpy.data.collections.new( collection_name) # 添加到对应的区域集合 if uid in self.zones: zone_collection_name = self.zones[uid]["collection_name"] if zone_collection_name in bpy.data.collections: parent_collection = bpy.data.collections[zone_collection_name] parent_collection.children.link(collection) else: bpy.context.scene.collection.children.link( collection) memory_manager.register_object(collection) created_parts[part_id] = { "collection_name": collection_name, "data": part_data, "creation_time": time.time() } except Exception as e: logger.error(f"创建零件失败 {part_id}: {e}") # 存储零件数据 self.parts[uid] = created_parts logger.info(f"创建零件成功: {uid}, {len(created_parts)} 个零件") except Exception as e: logger.error(f"创建零件操作失败: {e}") def c03(self, data: Dict[str, Any]): """创建硬件""" try: uid = data.get("uid") hardware_data = data.get("hardwares", {}) with safe_blender_operation(f"创建硬件 uid={uid}"): created_hardware = {} for hw_id, hw_data in hardware_data.items(): try: # 创建硬件集合 collection_name = f"Hardware_{uid}_{hw_id}" if BLENDER_AVAILABLE and collection_name not in bpy.data.collections: collection = bpy.data.collections.new( collection_name) bpy.context.scene.collection.children.link( collection) memory_manager.register_object(collection) created_hardware[hw_id] = { "collection_name": collection_name, "data": hw_data, "creation_time": time.time() } except Exception as e: logger.error(f"创建硬件失败 {hw_id}: {e}") # 存储硬件数据 self.hardwares[uid] = created_hardware logger.info(f"创建硬件成功: {uid}, {len(created_hardware)} 个硬件") except Exception as e: logger.error(f"创建硬件操作失败: {e}") def c04(self, data: Dict[str, Any]): """创建面和边 - 优化版本""" try: uid = data.get("uid") parts_data = data.get("parts", {}) with safe_blender_operation(f"创建面和边 uid={uid}"): # 批量创建优化 total_created = 0 for part_id, part_data in parts_data.items(): try: # 获取零件集合 collection = None if BLENDER_AVAILABLE: collection_name = f"Part_{uid}_{part_id}" if collection_name in bpy.data.collections: collection = bpy.data.collections[collection_name] # 创建面 surfaces = part_data.get("surfaces", []) for surface in surfaces: try: obj = self.create_face( collection or bpy.context.scene.collection, surface, surface.get("color"), surface.get("scale"), surface.get("angle"), uid=uid, pid=part_id ) if obj: total_created += 1 except Exception as e: logger.error(f"创建面失败: {e}") # 创建边 edges = part_data.get("edges", []) for edge in edges: try: self.create_edges( collection or bpy.context.scene.collection, [edge] ) total_created += 1 except Exception as e: logger.error(f"创建边失败: {e}") except Exception as e: logger.error(f"处理零件失败 {part_id}: {e}") logger.info(f"创建面和边完成: {total_created} 个对象") # 定期清理 if total_created > 50: memory_manager.cleanup_orphaned_data() except Exception as e: logger.error(f"创建面和边操作失败: {e}") def create_edges(self, container: Any, edges_data: List) -> List: """创建边""" try: if not BLENDER_AVAILABLE: return [] created_edges = [] for edge_data in edges_data: try: # 解析边的起点和终点 start_point = Point3d.parse( edge_data.get("start", "0,0,0")) end_point = Point3d.parse(edge_data.get("end", "0,0,0")) if start_point and end_point: edge_obj = self._create_line_edge( container, start_point, end_point) if edge_obj: created_edges.append(edge_obj) memory_manager.register_object(edge_obj) except Exception as e: logger.error(f"创建单条边失败: {e}") return created_edges except Exception as e: logger.error(f"创建边失败: {e}") return [] def _create_line_edge(self, container: Any, start: Point3d, end: Point3d) -> Any: """创建线边""" try: if not BLENDER_AVAILABLE: return None # 创建边网格 mesh_name = f"SUWEdge_{hash(str(start) + str(end)) % 10000}" mesh = bpy.data.meshes.new(mesh_name) memory_manager.register_mesh(mesh) # 设置边的顶点和边 vertices = [ (start.x, start.y, start.z), (end.x, end.y, end.z) ] edges = [(0, 1)] mesh.from_pydata(vertices, edges, []) mesh.update() # 创建对象 obj_name = f"SUWEdgeObj_{hash(str(start) + str(end)) % 10000}" obj = bpy.data.objects.new(obj_name, mesh) memory_manager.register_object(obj) # 添加到集合 if hasattr(container, 'objects'): container.objects.link(obj) else: bpy.context.scene.collection.objects.link(obj) return obj except Exception as e: logger.error(f"创建线边失败: {e}") return None # ==================== 选择和可见性管理 ==================== def sel_clear(self): """清除选择""" try: if BLENDER_AVAILABLE: bpy.ops.object.select_all(action='DESELECT') self._selected_uid = None self._selected_obj = None self._selected_zone = None self._selected_part = None logger.info("选择已清除") except Exception as e: logger.error(f"清除选择失败: {e}") def sel_local(self, obj: Any): """选择本地对象""" try: if BLENDER_AVAILABLE and obj: # 清除当前选择 bpy.ops.object.select_all(action='DESELECT') # 选择指定对象 if obj.name in bpy.data.objects: bpy.data.objects[obj.name].select_set(True) bpy.context.view_layer.objects.active = bpy.data.objects[obj.name] # 更新选择状态 self._selected_obj = obj.name # 存储名称而非引用 logger.info(f"选择对象: {obj.name}") except Exception as e: logger.error(f"选择对象失败: {e}") # ==================== 材质和纹理管理 ==================== def add_mat_rgb(self, mat_id: str, alpha: float, r: int, g: int, b: int): """添加RGB材质""" try: if not BLENDER_AVAILABLE: return # 检查材质是否已存在 if mat_id in self.material_cache: material_name = self.material_cache[mat_id] if material_name in bpy.data.materials: return bpy.data.materials[material_name] # 创建新材质 material = bpy.data.materials.new(mat_id) material.use_nodes = True # 设置颜色 if material.node_tree: principled = material.node_tree.nodes.get("Principled BSDF") if principled: color = (r/255.0, g/255.0, b/255.0, alpha) principled.inputs[0].default_value = color # 设置透明度 if alpha < 1.0: material.blend_method = 'BLEND' # Alpha input principled.inputs[21].default_value = alpha # 缓存材质 self.material_cache[mat_id] = material.name memory_manager.register_object(material) logger.info(f"创建RGB材质: {mat_id}") return material except Exception as e: logger.error(f"创建RGB材质失败: {e}") return None # ==================== 系统监控和维护 ==================== def monitor_memory_usage(self): """监控内存使用情况""" try: if not BLENDER_AVAILABLE: return # 检查是否需要清理 if memory_manager.should_cleanup(): logger.info("触发定期内存清理") cleanup_count = memory_manager.cleanup_orphaned_data() if cleanup_count > 0: logger.info(f"清理了 {cleanup_count} 个孤立数据") gc.collect() # 检查内存压力 total_objects = len(bpy.data.objects) total_meshes = len(bpy.data.meshes) if total_objects > 10000 or total_meshes > 5000: logger.warning( f"内存压力警告: {total_objects} 个对象, {total_meshes} 个网格") # 执行强制清理 self.force_cleanup() except Exception as e: logger.error(f"内存监控失败: {e}") def get_memory_report(self) -> Dict[str, Any]: """获取内存报告""" try: report = { "timestamp": time.time(), "creation_stats": self.get_creation_stats(), "system_diagnosis": self.diagnose_system_state(), "memory_manager": { "object_registry_size": len(memory_manager.object_registry), "mesh_registry_size": len(memory_manager.mesh_registry), "last_cleanup": memory_manager.last_cleanup_time, "cleanup_interval": memory_manager.cleanup_interval } } if BLENDER_AVAILABLE: report["blender_data"] = { "objects": len(bpy.data.objects), "meshes": len(bpy.data.meshes), "materials": len(bpy.data.materials), "textures": len(bpy.data.textures), "images": len(bpy.data.images) } return report except Exception as e: logger.error(f"生成内存报告失败: {e}") return {"error": str(e)} # ==================== 属性访问器 ==================== @property def selected_uid(self): return self._selected_uid @property def selected_zone(self): return self._selected_zone @property def selected_part(self): return self._selected_part @property def selected_obj(self): # 返回对象名称而非引用,避免悬空引用 return self._selected_obj @property def server_path(self): return self._server_path @property def default_zone(self): return self._default_zone # ==================== 兼容性方法存根 ==================== def c05(self, data: Dict[str, Any]): """业务方法存根 - 保持兼容性""" logger.info("执行c05操作") def c06(self, data: Dict[str, Any]): """业务方法存根 - 保持兼容性""" logger.info("执行c06操作") def c07(self, data: Dict[str, Any]): """业务方法存根 - 保持兼容性""" logger.info("执行c07操作") def c08(self, data: Dict[str, Any]): """业务方法存根 - 保持兼容性""" logger.info("执行c08操作") def c10(self, data: Dict[str, Any]): """设置门信息""" try: uid = data.get("uid") doors = data.get("drs", []) logger.info(f"设置门信息: uid={uid}, {len(doors)} 个门") # 存储门信息 if uid not in self.parts: self.parts[uid] = {} self.parts[uid]["doors"] = doors except Exception as e: logger.error(f"设置门信息失败: {e}") def show_message(self, data: Dict[str, Any]): """显示消息""" try: message = data.get("message", "") logger.info(f"显示消息: {message}") except Exception as e: logger.error(f"显示消息失败: {e}") def set_config(self, data: Dict[str, Any]): """设置配置""" try: config = data.get("config", {}) logger.info(f"设置配置: {len(config)} 个配置项") # 应用内存管理相关配置 if "memory_cleanup_interval" in config: memory_manager.cleanup_interval = config["memory_cleanup_interval"] if "batch_size" in config: self.batch_size = config["batch_size"] except Exception as e: logger.error(f"设置配置失败: {e}") # ==================== 清理和销毁 ==================== def shutdown(self): """关闭系统""" try: logger.info("开始关闭SUWood系统") # 执行最终清理 self.force_cleanup() # 清理所有缓存 self.mesh_cache.clear() self.material_cache.clear() self.object_references.clear() # 清理数据结构 self.parts.clear() self.zones.clear() self.hardwares.clear() logger.info("✅ SUWood系统关闭完成") except Exception as e: logger.error(f"关闭系统失败: {e}") def __del__(self): """析构函数""" try: self.shutdown() except: pass # ==================== 模块级别的便利函数 ==================== def create_suw_instance(): """创建SUW实例的便利函数""" return SUWImpl.get_instance() def cleanup_blender_memory(): """清理Blender内存的便利函数""" try: if BLENDER_AVAILABLE: cleanup_count = memory_manager.cleanup_orphaned_data() gc.collect() logger.info(f"清理了 {cleanup_count} 个孤立数据") return cleanup_count return 0 except Exception as e: logger.error(f"清理内存失败: {e}") return 0 def get_memory_usage_summary(): """获取内存使用摘要""" try: if BLENDER_AVAILABLE: return { "objects": len(bpy.data.objects), "meshes": len(bpy.data.meshes), "materials": len(bpy.data.materials), "memory_manager_stats": memory_manager.creation_stats.copy() } return {"blender_available": False} except Exception as e: logger.error(f"获取内存使用摘要失败: {e}") return {"error": str(e)} # ==================== 主程序入口 ==================== if __name__ == "__main__": # 测试代码 try: logger.info("开始测试SUWImpl内存管理") # 创建实例 suw = create_suw_instance() suw.startup() # 获取内存报告 report = suw.get_memory_report() logger.info(f"内存报告: {report}") # 执行诊断 issues = suw.diagnose_system_state() if not issues: logger.info("✅ 系统状态正常") # 清理测试 cleanup_count = cleanup_blender_memory() logger.info(f"清理测试完成: {cleanup_count}") except Exception as e: logger.error(f"测试失败: {e}") finally: logger.info("测试完成")