suwoodblender/blenderpython/suw_impl - 副本 (17).py

1712 lines
58 KiB
Python
Raw Permalink Normal View History

2025-07-18 17:09:39 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Implementation - Python翻译版本
原文件: SUWImpl.rb (2019)
用途: 核心实现类SUWood的主要功能
翻译进度: Phase 1 - 几何类和基础框架
内存管理优化: 应用 Blender Python API 最佳实践
"""
import re
import math
import logging
import time
import gc
import weakref
from typing import Optional, Any, Dict, List, Tuple, Union
from contextlib import contextmanager
# 设置日志
logger = logging.getLogger(__name__)
# 尝试相对导入,失败则使用绝对导入
try:
from .suw_constants import SUWood
except ImportError:
try:
from suw_constants import SUWood
except ImportError:
# 如果都找不到,创建一个基本的存根
class SUWood:
@staticmethod
def suwood_path(version):
return "."
try:
import bpy
import mathutils
import bmesh
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
print("⚠️ Blender API 不可用,使用基础几何类")
# 创建存根mathutils模块
class MockMathutils:
class Vector:
def __init__(self, vec):
self.x, self.y, self.z = vec[:3] if len(
vec) >= 3 else (vec + [0, 0])[:3]
def normalized(self):
return self
def dot(self, other):
return 0
class Matrix:
@staticmethod
def Scale(scale, size, axis):
return MockMathutils.Matrix()
@staticmethod
def Translation(vec):
return MockMathutils.Matrix()
@staticmethod
def Rotation(angle, size):
return MockMathutils.Matrix()
def __matmul__(self, other):
return MockMathutils.Matrix()
mathutils = MockMathutils()
# ==================== 内存管理核心类 ====================
class BlenderMemoryManager:
"""Blender 内存管理器 - 应用最佳实践"""
def __init__(self):
self.object_registry = weakref.WeakSet()
self.mesh_registry = weakref.WeakSet()
self.material_registry = weakref.WeakSet()
self.creation_stats = {
'objects_created': 0,
'meshes_created': 0,
'materials_created': 0,
'cleanup_calls': 0,
'failed_cleanups': 0
}
self.last_cleanup_time = time.time()
self.cleanup_interval = 300 # 5分钟
def register_object(self, obj):
"""注册对象到弱引用集合"""
if obj:
self.object_registry.add(obj)
self.creation_stats['objects_created'] += 1
def register_mesh(self, mesh):
"""注册网格到弱引用集合"""
if mesh:
self.mesh_registry.add(mesh)
self.creation_stats['meshes_created'] += 1
def should_cleanup(self):
"""检查是否应该执行清理"""
return time.time() - self.last_cleanup_time > self.cleanup_interval
def cleanup_orphaned_data(self):
"""清理孤立数据"""
if not BLENDER_AVAILABLE:
return 0
cleanup_count = 0
try:
# 清理孤立网格
for mesh in list(bpy.data.meshes):
if mesh.users == 0:
try:
bpy.data.meshes.remove(mesh, do_unlink=True)
cleanup_count += 1
except:
pass
# 清理孤立材质
for material in list(bpy.data.materials):
if material.users == 0:
try:
bpy.data.materials.remove(material, do_unlink=True)
cleanup_count += 1
except:
pass
# 清理孤立纹理
for texture in list(bpy.data.textures):
if texture.users == 0:
try:
bpy.data.textures.remove(texture, do_unlink=True)
cleanup_count += 1
except:
pass
# 更新清理时间
self.last_cleanup_time = time.time()
self.creation_stats['cleanup_calls'] += 1
logger.info(f"清理了 {cleanup_count} 个孤立数据块")
return cleanup_count
except Exception as e:
logger.error(f"清理孤立数据失败: {e}")
self.creation_stats['failed_cleanups'] += 1
return 0
# 全局内存管理器实例
memory_manager = BlenderMemoryManager()
@contextmanager
def safe_blender_operation(operation_name: str):
"""
安全的 Blender 操作上下文管理器
使用方法
with safe_blender_operation("创建网格"):
# 执行 Blender 操作
pass
"""
logger.info(f"开始操作: {operation_name}")
start_time = time.time()
# 记录初始状态
initial_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
initial_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
try:
yield
except Exception as e:
logger.error(f"操作失败 {operation_name}: {e}")
# 尝试清理
try:
if memory_manager.should_cleanup():
memory_manager.cleanup_orphaned_data()
gc.collect()
except:
pass
raise
finally:
# 记录最终状态
final_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
final_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
elapsed_time = time.time() - start_time
logger.info(f"操作完成: {operation_name}")
logger.info(f"耗时: {elapsed_time:.2f}")
logger.info(f"对象变化: {final_objects - initial_objects}")
logger.info(f"网格变化: {final_meshes - initial_meshes}")
# ==================== 几何类扩展 ====================
class Point3d:
"""3D点类 - 对应Ruby的Geom::Point3d"""
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
self.x = x
self.y = y
self.z = z
@classmethod
def parse(cls, value: str):
"""从字符串解析3D点"""
if not value or value.strip() == "":
return None
# 解析格式: "(x,y,z)" 或 "x,y,z"
clean_value = re.sub(r'[()]*', '', value)
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
# 转换mm为内部单位假设输入是mm
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
def to_s(self, unit: str = "mm", digits: int = -1) -> str:
"""转换为字符串"""
if unit == "cm":
x_val = self.x * 100 # 内部单位转换为cm
y_val = self.y * 100
z_val = self.z * 100
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
else: # mm
x_val = self.x * 1000 # 内部单位转换为mm
y_val = self.y * 1000
z_val = self.z * 1000
if digits == -1:
return f"({x_val}, {y_val}, {z_val})"
else:
return f"({x_val:.{digits}f}, {y_val:.{digits}f}, {z_val:.{digits}f})"
def __str__(self):
return self.to_s()
def __repr__(self):
return f"Point3d({self.x}, {self.y}, {self.z})"
class Vector3d:
"""3D向量类 - 对应Ruby的Geom::Vector3d"""
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
self.x = x
self.y = y
self.z = z
@classmethod
def parse(cls, value: str):
"""从字符串解析3D向量"""
if not value or value.strip() == "":
return None
clean_value = re.sub(r'[()]*', '', value)
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
def to_s(self, unit: str = "mm") -> str:
"""转换为字符串"""
if unit == "cm":
x_val = self.x * 100 # 内部单位转换为cm
y_val = self.y * 100
z_val = self.z * 100
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
elif unit == "in":
return f"({self.x}, {self.y}, {self.z})"
else: # mm
x_val = self.x * 1000 # 内部单位转换为mm
y_val = self.y * 1000
z_val = self.z * 1000
return f"({x_val}, {y_val}, {z_val})"
def normalize(self):
"""归一化向量"""
length = math.sqrt(self.x**2 + self.y**2 + self.z**2)
if length > 0:
return Vector3d(self.x/length, self.y/length, self.z/length)
return Vector3d(0, 0, 0)
def __str__(self):
return self.to_s()
class Transformation:
"""变换矩阵类 - 对应Ruby的Geom::Transformation"""
def __init__(self, origin: Point3d = None, x_axis: Vector3d = None,
y_axis: Vector3d = None, z_axis: Vector3d = None):
self.origin = origin or Point3d(0, 0, 0)
self.x_axis = x_axis or Vector3d(1, 0, 0)
self.y_axis = y_axis or Vector3d(0, 1, 0)
self.z_axis = z_axis or Vector3d(0, 0, 1)
@classmethod
def parse(cls, data: Dict[str, str]):
"""从字典解析变换"""
origin = Point3d.parse(data.get("o"))
x_axis = Vector3d.parse(data.get("x"))
y_axis = Vector3d.parse(data.get("y"))
z_axis = Vector3d.parse(data.get("z"))
return cls(origin, x_axis, y_axis, z_axis)
def store(self, data: Dict[str, str]):
"""存储变换到字典"""
data["o"] = self.origin.to_s("mm")
data["x"] = self.x_axis.to_s("in")
data["y"] = self.y_axis.to_s("in")
data["z"] = self.z_axis.to_s("in")
# ==================== SUWood 材质类型常量 ====================
MAT_TYPE_NORMAL = 0
MAT_TYPE_OBVERSE = 1
MAT_TYPE_NATURE = 2
# ==================== SUWImpl 核心实现类 ====================
class SUWImpl:
"""SUWood核心实现类 - 完整翻译版本,应用内存管理最佳实践"""
_instance = None
_selected_uid = None
_selected_obj = None
_selected_zone = None
_selected_part = None
_scaled_zone = None
_server_path = None
_default_zone = None
_creation_lock = False
_mesh_creation_count = 0
_batch_operation_active = False
def __init__(self):
"""初始化SUWImpl实例"""
# 基础属性
self.parts = {}
self.zones = {}
self.hardwares = {}
self.materials = {}
self.textures = {}
# 内存管理相关
self.object_references = {} # 存储对象名称而非引用
self.mesh_cache = {}
self.material_cache = {}
# 批量操作优化
self.deferred_updates = []
self.batch_size = 50
logger.info("SUWImpl 初始化完成,启用内存管理优化")
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def startup(self):
"""启动SUWood系统"""
logger.info("启动SUWood系统...")
if BLENDER_AVAILABLE:
try:
self._create_layers()
self._init_materials()
self._init_default_zone()
logger.info("✅ SUWood系统启动成功")
except Exception as e:
logger.error(f"❌ SUWood系统启动失败: {e}")
raise
else:
logger.warning("⚠️ Blender不可用使用基础模式")
def _create_layers(self):
"""创建图层"""
if not BLENDER_AVAILABLE:
return
try:
# 创建必要的集合
collections = ["SUWood_Parts", "SUWood_Hardware", "SUWood_Zones"]
for collection_name in collections:
if collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(collection_name)
bpy.context.scene.collection.children.link(collection)
logger.info(f"创建集合: {collection_name}")
except Exception as e:
logger.error(f"创建图层失败: {e}")
def _init_materials(self):
"""初始化材质"""
if not BLENDER_AVAILABLE:
return
try:
# 创建默认材质
default_materials = [
("SUWood_Default", (0.8, 0.8, 0.8, 1.0)),
("SUWood_Wood", (0.6, 0.4, 0.2, 1.0)),
("SUWood_Metal", (0.7, 0.7, 0.8, 1.0)),
]
for mat_name, color in default_materials:
if mat_name not in bpy.data.materials:
material = bpy.data.materials.new(mat_name)
material.use_nodes = True
if material.node_tree:
principled = material.node_tree.nodes.get(
"Principled BSDF")
if principled:
principled.inputs[0].default_value = color
self.material_cache[mat_name] = material.name # 存储名称而非引用
memory_manager.register_object(material)
logger.info(f"创建材质: {mat_name}")
except Exception as e:
logger.error(f"初始化材质失败: {e}")
def _init_default_zone(self):
"""初始化默认区域"""
try:
self._default_zone = {
"name": "Default Zone",
"id": "default_zone_001",
"visible": True,
"parts": {},
"hardwares": {}
}
self.zones["default"] = self._default_zone
logger.info("初始化默认区域")
except Exception as e:
logger.error(f"初始化默认区域失败: {e}")
# ==================== 内存管理优化的核心方法 ====================
def validate_mesh_data(self, vertices: List, faces: List) -> bool:
"""验证网格数据有效性"""
try:
# 检查顶点数量
if len(vertices) < 3:
logger.warning("顶点数量不足")
return False
# 检查面数据
for face in faces:
if len(face) < 3:
logger.warning("面顶点数量不足")
return False
# 检查顶点索引有效性
for vertex_index in face:
if vertex_index >= len(vertices) or vertex_index < 0:
logger.warning(f"无效的顶点索引: {vertex_index}")
return False
return True
except Exception as e:
logger.error(f"网格数据验证失败: {e}")
return False
def validate_coplanar_points(self, points: List[Tuple[float, float, float]]) -> bool:
"""验证点是否共面"""
if len(points) < 4:
return True # 3个点总是共面的
try:
# 使用前三个点定义平面
p1 = mathutils.Vector(points[0])
p2 = mathutils.Vector(points[1])
p3 = mathutils.Vector(points[2])
# 计算平面法向量
normal = (p2 - p1).cross(p3 - p1).normalized()
# 检查其他点是否在同一平面上
tolerance = 1e-6
for point in points[3:]:
p = mathutils.Vector(point)
distance = abs((p - p1).dot(normal))
if distance > tolerance:
return False
return True
except Exception as e:
logger.error(f"共面检查失败: {e}")
return False
@contextmanager
def atomic_mesh_creation(self, name: str):
"""原子网格创建上下文管理器"""
mesh = None
obj = None
try:
# 第1步创建网格数据
mesh = bpy.data.meshes.new(name)
memory_manager.register_mesh(mesh)
yield mesh
except Exception as e:
logger.error(f"原子网格创建失败: {e}")
# 自动清理
if obj:
self._cleanup_object_safe(obj)
if mesh:
self._cleanup_mesh_safe(mesh)
raise
def _cleanup_mesh_safe(self, mesh):
"""安全清理网格"""
try:
if mesh and mesh.name in bpy.data.meshes:
bpy.data.meshes.remove(mesh, do_unlink=True)
memory_manager.creation_stats['cleanup_calls'] += 1
logger.info(f"清理网格: {mesh.name}")
except Exception as e:
logger.error(f"网格清理失败: {e}")
memory_manager.creation_stats['failed_cleanups'] += 1
def _cleanup_object_safe(self, obj):
"""安全清理对象"""
try:
if obj and obj.name in bpy.data.objects:
# 从所有集合中移除
for collection in obj.users_collection:
collection.objects.unlink(obj)
# 删除对象
bpy.data.objects.remove(obj, do_unlink=True)
memory_manager.creation_stats['cleanup_calls'] += 1
logger.info(f"清理对象: {obj.name}")
except Exception as e:
logger.error(f"对象清理失败: {e}")
memory_manager.creation_stats['failed_cleanups'] += 1
def _create_single_mesh_atomic(self, container: Any, surface: Dict[str, Any],
color: str = None, scale: float = None, angle: float = None,
series: List = None, uid: str = None, zid: Any = None,
pid: Any = None, child: Any = None, options: Dict = None) -> Any:
"""优化的原子性mesh创建"""
with safe_blender_operation(f"创建网格 uid={uid}"):
try:
# 增加创建计数
self.__class__._mesh_creation_count += 1
creation_count = self.__class__._mesh_creation_count
# 定期清理
if creation_count % 100 == 0:
logger.info(f"执行定期清理,当前创建数: {creation_count}")
memory_manager.cleanup_orphaned_data()
gc.collect()
# 数据验证
segs = surface.get("segs", [])
points = []
for seg in segs:
if isinstance(seg, list) and len(seg) >= 1:
point_str = seg[0] if isinstance(
seg[0], str) else str(seg[0])
try:
point = Point3d.parse(point_str)
if point:
points.append(point)
except Exception as e:
logger.warning(f"点解析失败: {point_str}")
if len(points) < 3:
logger.warning(f"有效点数不足: {len(points)}")
return None
# 面积验证
area = self._calculate_face_area(points)
if area < 1e-6:
logger.warning(f"面积过小: {area}")
return None
# 验证几何数据
vertices = [(float(p.x), float(p.y), float(p.z))
for p in points]
faces = [list(range(len(vertices)))] if len(
vertices) >= 3 else []
if not self.validate_mesh_data(vertices, faces):
logger.warning("网格数据验证失败")
return None
# 原子性创建
timestamp = hash(
str(points[0]) + str(uid) + str(child) + str(creation_count))
mesh_name = f"SUWMesh_{abs(timestamp) % 10000}"
with self.atomic_mesh_creation(mesh_name) as mesh:
# 设置几何数据
mesh.from_pydata(vertices, [], faces)
# 创建UV坐标
if len(vertices) >= 3:
try:
uv_layer = mesh.uv_layers.new(name="UVMap")
self._generate_uv_coordinates(mesh, uv_layer)
except Exception as e:
logger.warning(f"UV坐标创建失败: {e}")
# 安全更新网格
if not self._safe_mesh_update(mesh):
raise RuntimeError("网格更新失败")
# 创建对象
obj_name = f"SUWObj_{abs(timestamp) % 10000}"
obj = bpy.data.objects.new(obj_name, mesh)
memory_manager.register_object(obj)
# 添加到集合
if hasattr(container, 'objects'):
container.objects.link(obj)
else:
bpy.context.scene.collection.objects.link(obj)
# 设置属性
self._set_object_attributes_safe(
obj, uid, zid, pid, child, area, creation_count)
# 应用材质
if color:
self._apply_material_safe(obj, color)
# 存储对象引用(名称而非对象)
self.object_references[obj_name] = {
'uid': uid,
'type': 'mesh',
'creation_time': time.time()
}
logger.info(f"✅ 网格创建成功: {obj.name}, {len(vertices)}顶点")
return obj
except Exception as e:
logger.error(f"网格创建失败: {e}")
return None
def _generate_uv_coordinates(self, mesh, uv_layer):
"""生成UV坐标"""
try:
for poly in mesh.polygons:
for i, loop_index in enumerate(poly.loop_indices):
vertex_count = len(poly.vertices)
if vertex_count == 4: # 四边形
uv_coords = [(0, 0), (1, 0), (1, 1), (0, 1)]
uv_layer.data[loop_index].uv = uv_coords[i % 4]
elif vertex_count == 3: # 三角形
uv_coords = [(0, 0), (1, 0), (0.5, 1)]
uv_layer.data[loop_index].uv = uv_coords[i % 3]
else: # 其他多边形
angle = (i / vertex_count) * 2 * math.pi
u = (math.cos(angle) + 1) * 0.5
v = (math.sin(angle) + 1) * 0.5
uv_layer.data[loop_index].uv = (u, v)
except Exception as e:
logger.error(f"UV坐标生成失败: {e}")
def _safe_mesh_update(self, mesh, max_retries=3):
"""安全更新网格"""
for attempt in range(max_retries):
try:
# 检查网格是否仍然有效
if not mesh or mesh.name not in bpy.data.meshes:
return False
# 更新网格
mesh.update()
# 验证更新结果
if not mesh.vertices:
raise RuntimeError("网格更新后无顶点")
return True
except Exception as e:
logger.warning(f"网格更新失败,尝试 {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
time.sleep(0.1) # 短暂等待
else:
return False
return False
def _set_object_attributes_safe(self, obj, uid, zid, pid, child, area, creation_count):
"""安全设置对象属性"""
try:
obj["uid"] = str(uid) if uid else "unknown"
obj["zid"] = str(zid) if zid else "unknown"
obj["pid"] = str(pid) if pid else "unknown"
obj["child"] = str(child) if child else "unknown"
obj["area"] = float(area) if area else 0.0
obj["creation_index"] = creation_count
obj["creation_time"] = time.time()
obj["memory_managed"] = True
except Exception as e:
logger.error(f"设置对象属性失败: {e}")
def _apply_material_safe(self, obj, color):
"""安全应用材质"""
try:
material = self.get_texture(color)
if material:
if obj.data.materials:
obj.data.materials[0] = material
else:
obj.data.materials.append(material)
logger.info(f"材质应用成功: {color}")
else:
self._apply_simple_material(obj, color)
except Exception as e:
logger.error(f"材质应用失败: {e}")
try:
self._apply_simple_material(obj, color)
except:
pass
def _apply_simple_material(self, obj: Any, color: str):
"""应用简单材质"""
try:
# 创建或获取缓存的材质
mat_name = f"SUWMat_{color}" if color else "SUWMat_default"
if mat_name in self.material_cache:
material_name = self.material_cache[mat_name]
if material_name in bpy.data.materials:
material = bpy.data.materials[material_name]
obj.data.materials.append(material)
return
# 创建新材质
material = bpy.data.materials.new(mat_name)
material.use_nodes = True
if material.node_tree and material.node_tree.nodes:
principled = material.node_tree.nodes.get("Principled BSDF")
if principled:
if color and len(color) >= 6:
try:
# 解析颜色
r = int(color[0:2], 16) / 255.0
g = int(color[2:4], 16) / 255.0
b = int(color[4:6], 16) / 255.0
principled.inputs[0].default_value = (r, g, b, 1.0)
except:
principled.inputs[0].default_value = (
0.8, 0.8, 0.8, 1.0)
else:
principled.inputs[0].default_value = (
0.8, 0.8, 0.8, 1.0)
# 缓存材质
self.material_cache[mat_name] = material.name
memory_manager.register_object(material)
# 应用材质
obj.data.materials.append(material)
logger.info(f"简单材质创建并应用: {mat_name}")
except Exception as e:
logger.error(f"简单材质应用失败: {e}")
# ==================== 优化的删除和清理方法 ====================
def _force_delete_objects_optimized(self, objects_to_delete: List[Any]):
"""优化的强制删除对象"""
try:
if not BLENDER_AVAILABLE:
return
logger.info(f"开始优化删除 {len(objects_to_delete)} 个对象")
# 预处理:收集对象名称
object_names = []
for obj in objects_to_delete:
if obj and hasattr(obj, 'name'):
object_names.append(obj.name)
# 批量删除
deleted_count = 0
for obj_name in object_names:
try:
if obj_name in bpy.data.objects:
obj = bpy.data.objects[obj_name]
# 从集合中移除
collections_to_unlink = list(obj.users_collection)
for collection in collections_to_unlink:
try:
collection.objects.unlink(obj)
except:
pass
# 删除对象
bpy.data.objects.remove(obj, do_unlink=True)
deleted_count += 1
# 从引用中移除
if obj_name in self.object_references:
del self.object_references[obj_name]
except Exception as e:
logger.error(f"删除对象失败 {obj_name}: {e}")
# 更新依赖图
if deleted_count > 0:
self._update_dependency_graph(full_update=True)
logger.info(f"✅ 优化删除完成: {deleted_count} 个对象")
except Exception as e:
logger.error(f"优化删除失败: {e}")
def _update_dependency_graph(self, full_update: bool = False):
"""更新Blender依赖图"""
try:
if not BLENDER_AVAILABLE:
return
if full_update:
logger.info("执行全局依赖图更新")
bpy.context.view_layer.update()
bpy.context.evaluated_depsgraph_get().update()
# 刷新视图
try:
for area in bpy.context.screen.areas:
if area.type in ['VIEW_3D', 'OUTLINER']:
area.tag_redraw()
except:
pass
logger.info("全局依赖图更新完成")
else:
# 快速更新
bpy.context.view_layer.update()
except Exception as e:
logger.error(f"依赖图更新失败: {e}")
# ==================== 保持原有业务逻辑的方法 ====================
def get_zones(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取区域信息"""
return self.zones
def get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取零件信息"""
return self.parts
def get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取硬件信息"""
return self.hardwares
def get_texture(self, key: str):
"""获取纹理材质"""
if not BLENDER_AVAILABLE:
return None
try:
# 从缓存中获取
if key in self.material_cache:
material_name = self.material_cache[key]
if material_name in bpy.data.materials:
return bpy.data.materials[material_name]
# 查找现有材质
for material in bpy.data.materials:
if key in material.name:
self.material_cache[key] = material.name
return material
return None
except Exception as e:
logger.error(f"获取纹理失败: {e}")
return None
def create_face(self, container: Any, surface: Dict[str, Any], color: str = None,
scale: float = None, angle: float = None, series: List = None,
uid: str = None, zid: Any = None, pid: Any = None,
child: Any = None) -> Any:
"""创建面 - 使用优化的内存管理"""
try:
return self._create_single_mesh_atomic(
container, surface, color, scale, angle,
series, uid, zid, pid, child
)
except Exception as e:
logger.error(f"创建面失败: {e}")
return None
def _calculate_face_area(self, points: List[Point3d]) -> float:
"""计算面积"""
try:
if len(points) < 3:
return 0.0
# 使用三角剖分计算面积
area = 0.0
for i in range(1, len(points) - 1):
p1 = points[0]
p2 = points[i]
p3 = points[i + 1]
# 计算三角形面积
v1 = Vector3d(p2.x - p1.x, p2.y - p1.y, p2.z - p1.z)
v2 = Vector3d(p3.x - p1.x, p3.y - p1.y, p3.z - p1.z)
# 叉积的模长的一半就是三角形面积
cross_x = v1.y * v2.z - v1.z * v2.y
cross_y = v1.z * v2.x - v1.x * v2.z
cross_z = v1.x * v2.y - v1.y * v2.x
triangle_area = 0.5 * \
math.sqrt(cross_x**2 + cross_y**2 + cross_z**2)
area += triangle_area
return area
except Exception as e:
logger.error(f"计算面积失败: {e}")
return 0.0
# ==================== 系统管理方法 ====================
def get_creation_stats(self) -> Dict[str, Any]:
"""获取创建统计信息"""
try:
stats = {
"creation_count": self.__class__._mesh_creation_count,
"object_references": len(self.object_references),
"mesh_cache_size": len(self.mesh_cache),
"material_cache_size": len(self.material_cache),
"memory_manager_stats": memory_manager.creation_stats.copy(),
"blender_available": BLENDER_AVAILABLE
}
if BLENDER_AVAILABLE:
stats["total_objects"] = len(bpy.data.objects)
stats["total_meshes"] = len(bpy.data.meshes)
stats["total_materials"] = len(bpy.data.materials)
return stats
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return {"error": str(e)}
def force_cleanup(self):
"""强制清理"""
try:
logger.info("开始强制清理")
# 清理孤立数据
cleanup_count = memory_manager.cleanup_orphaned_data()
# 清理缓存
self.mesh_cache.clear()
# 清理过期的对象引用
current_time = time.time()
expired_refs = []
for obj_name, ref_info in self.object_references.items():
if current_time - ref_info.get('creation_time', 0) > 3600: # 1小时
expired_refs.append(obj_name)
for obj_name in expired_refs:
del self.object_references[obj_name]
# 强制垃圾回收
gc.collect()
# 更新依赖图
if BLENDER_AVAILABLE:
self._update_dependency_graph(full_update=True)
logger.info(
f"强制清理完成: 清理了 {cleanup_count} 个数据块,{len(expired_refs)} 个过期引用")
except Exception as e:
logger.error(f"强制清理失败: {e}")
def diagnose_system_state(self):
"""诊断系统状态"""
try:
logger.info("=== 系统状态诊断 ===")
# 内存使用情况
stats = self.get_creation_stats()
for key, value in stats.items():
logger.info(f"{key}: {value}")
# 检查潜在问题
issues = []
if BLENDER_AVAILABLE:
# 检查孤立数据
orphaned_meshes = [m for m in bpy.data.meshes if m.users == 0]
if orphaned_meshes:
issues.append(f"发现 {len(orphaned_meshes)} 个孤立网格")
# 检查空网格
empty_meshes = [m for m in bpy.data.meshes if not m.vertices]
if empty_meshes:
issues.append(f"发现 {len(empty_meshes)} 个空网格")
# 检查总顶点数
total_vertices = sum(len(m.vertices) for m in bpy.data.meshes)
if total_vertices > 1000000: # 100万顶点
issues.append(f"顶点数量过多: {total_vertices}")
if issues:
logger.warning("发现问题:")
for issue in issues:
logger.warning(f" - {issue}")
else:
logger.info("✅ 系统状态正常")
return issues
except Exception as e:
logger.error(f"系统诊断失败: {e}")
return [f"诊断失败: {e}"]
# ==================== 其他必要的方法存根 ====================
def c09(self, data: Dict[str, Any]):
"""删除操作 - 使用优化的内存管理"""
try:
uid = data.get("uid")
typ = data.get("typ", "uid")
oid = data.get("oid")
logger.info(f"执行删除操作: uid={uid}, typ={typ}, oid={oid}")
# 查找要删除的对象
objects_to_delete = self._find_objects_for_deletion(uid, typ, oid)
if objects_to_delete:
# 使用优化的删除方法
self._force_delete_objects_optimized(objects_to_delete)
# 清理数据
self._safe_data_cleanup(uid, typ, oid)
except Exception as e:
logger.error(f"删除操作失败: {e}")
def _find_objects_for_deletion(self, uid: str, typ: str, oid: Any) -> List[Any]:
"""查找要删除的对象"""
try:
if not BLENDER_AVAILABLE:
return []
objects_to_delete = []
# 根据对象引用查找
for obj_name, ref_info in self.object_references.items():
if ref_info.get('uid') == uid:
if obj_name in bpy.data.objects:
objects_to_delete.append(bpy.data.objects[obj_name])
# 根据对象属性查找
for obj in bpy.data.objects:
if obj.get("uid") == uid:
objects_to_delete.append(obj)
logger.info(f"找到 {len(objects_to_delete)} 个要删除的对象")
return objects_to_delete
except Exception as e:
logger.error(f"查找删除对象失败: {e}")
return []
def _safe_data_cleanup(self, uid: str, typ: str, oid: Any):
"""安全的数据清理"""
try:
cleanup_count = 0
# 清理内部数据结构
data_fields = [self.parts, self.zones, self.hardwares]
for data_dict in data_fields:
if uid in data_dict:
del data_dict[uid]
cleanup_count += 1
# 清理对象引用
refs_to_remove = []
for obj_name, ref_info in self.object_references.items():
if ref_info.get('uid') == uid:
refs_to_remove.append(obj_name)
for obj_name in refs_to_remove:
del self.object_references[obj_name]
cleanup_count += 1
logger.info(f"数据清理完成: {cleanup_count}")
except Exception as e:
logger.error(f"数据清理失败: {e}")
# ==================== 其他业务逻辑方法的存根 ====================
def c00(self, data: Dict[str, Any]):
"""业务方法存根"""
pass
def c01(self, data: Dict[str, Any]):
"""创建区域"""
try:
uid = data.get("uid")
zone_data = data.get("zone", {})
with safe_blender_operation(f"创建区域 uid={uid}"):
# 创建区域集合
collection_name = f"Zone_{uid}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(collection_name)
bpy.context.scene.collection.children.link(collection)
memory_manager.register_object(collection)
# 存储区域数据
self.zones[uid] = {
"collection_name": collection_name,
"data": zone_data,
"parts": {},
"creation_time": time.time()
}
logger.info(f"创建区域成功: {uid}")
except Exception as e:
logger.error(f"创建区域失败: {e}")
def c02(self, data: Dict[str, Any]):
"""创建零件"""
try:
uid = data.get("uid")
parts_data = data.get("parts", {})
with safe_blender_operation(f"创建零件 uid={uid}"):
# 批量创建零件
created_parts = {}
for part_id, part_data in parts_data.items():
try:
# 创建零件集合
collection_name = f"Part_{uid}_{part_id}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(
collection_name)
# 添加到对应的区域集合
if uid in self.zones:
zone_collection_name = self.zones[uid]["collection_name"]
if zone_collection_name in bpy.data.collections:
parent_collection = bpy.data.collections[zone_collection_name]
parent_collection.children.link(collection)
else:
bpy.context.scene.collection.children.link(
collection)
memory_manager.register_object(collection)
created_parts[part_id] = {
"collection_name": collection_name,
"data": part_data,
"creation_time": time.time()
}
except Exception as e:
logger.error(f"创建零件失败 {part_id}: {e}")
# 存储零件数据
self.parts[uid] = created_parts
logger.info(f"创建零件成功: {uid}, {len(created_parts)} 个零件")
except Exception as e:
logger.error(f"创建零件操作失败: {e}")
def c03(self, data: Dict[str, Any]):
"""创建硬件"""
try:
uid = data.get("uid")
hardware_data = data.get("hardwares", {})
with safe_blender_operation(f"创建硬件 uid={uid}"):
created_hardware = {}
for hw_id, hw_data in hardware_data.items():
try:
# 创建硬件集合
collection_name = f"Hardware_{uid}_{hw_id}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(
collection_name)
bpy.context.scene.collection.children.link(
collection)
memory_manager.register_object(collection)
created_hardware[hw_id] = {
"collection_name": collection_name,
"data": hw_data,
"creation_time": time.time()
}
except Exception as e:
logger.error(f"创建硬件失败 {hw_id}: {e}")
# 存储硬件数据
self.hardwares[uid] = created_hardware
logger.info(f"创建硬件成功: {uid}, {len(created_hardware)} 个硬件")
except Exception as e:
logger.error(f"创建硬件操作失败: {e}")
def c04(self, data: Dict[str, Any]):
"""创建面和边 - 优化版本"""
try:
uid = data.get("uid")
parts_data = data.get("parts", {})
with safe_blender_operation(f"创建面和边 uid={uid}"):
# 批量创建优化
total_created = 0
for part_id, part_data in parts_data.items():
try:
# 获取零件集合
collection = None
if BLENDER_AVAILABLE:
collection_name = f"Part_{uid}_{part_id}"
if collection_name in bpy.data.collections:
collection = bpy.data.collections[collection_name]
# 创建面
surfaces = part_data.get("surfaces", [])
for surface in surfaces:
try:
obj = self.create_face(
collection or bpy.context.scene.collection,
surface,
surface.get("color"),
surface.get("scale"),
surface.get("angle"),
uid=uid,
pid=part_id
)
if obj:
total_created += 1
except Exception as e:
logger.error(f"创建面失败: {e}")
# 创建边
edges = part_data.get("edges", [])
for edge in edges:
try:
self.create_edges(
collection or bpy.context.scene.collection,
[edge]
)
total_created += 1
except Exception as e:
logger.error(f"创建边失败: {e}")
except Exception as e:
logger.error(f"处理零件失败 {part_id}: {e}")
logger.info(f"创建面和边完成: {total_created} 个对象")
# 定期清理
if total_created > 50:
memory_manager.cleanup_orphaned_data()
except Exception as e:
logger.error(f"创建面和边操作失败: {e}")
def create_edges(self, container: Any, edges_data: List) -> List:
"""创建边"""
try:
if not BLENDER_AVAILABLE:
return []
created_edges = []
for edge_data in edges_data:
try:
# 解析边的起点和终点
start_point = Point3d.parse(
edge_data.get("start", "0,0,0"))
end_point = Point3d.parse(edge_data.get("end", "0,0,0"))
if start_point and end_point:
edge_obj = self._create_line_edge(
container, start_point, end_point)
if edge_obj:
created_edges.append(edge_obj)
memory_manager.register_object(edge_obj)
except Exception as e:
logger.error(f"创建单条边失败: {e}")
return created_edges
except Exception as e:
logger.error(f"创建边失败: {e}")
return []
def _create_line_edge(self, container: Any, start: Point3d, end: Point3d) -> Any:
"""创建线边"""
try:
if not BLENDER_AVAILABLE:
return None
# 创建边网格
mesh_name = f"SUWEdge_{hash(str(start) + str(end)) % 10000}"
mesh = bpy.data.meshes.new(mesh_name)
memory_manager.register_mesh(mesh)
# 设置边的顶点和边
vertices = [
(start.x, start.y, start.z),
(end.x, end.y, end.z)
]
edges = [(0, 1)]
mesh.from_pydata(vertices, edges, [])
mesh.update()
# 创建对象
obj_name = f"SUWEdgeObj_{hash(str(start) + str(end)) % 10000}"
obj = bpy.data.objects.new(obj_name, mesh)
memory_manager.register_object(obj)
# 添加到集合
if hasattr(container, 'objects'):
container.objects.link(obj)
else:
bpy.context.scene.collection.objects.link(obj)
return obj
except Exception as e:
logger.error(f"创建线边失败: {e}")
return None
# ==================== 选择和可见性管理 ====================
def sel_clear(self):
"""清除选择"""
try:
if BLENDER_AVAILABLE:
bpy.ops.object.select_all(action='DESELECT')
self._selected_uid = None
self._selected_obj = None
self._selected_zone = None
self._selected_part = None
logger.info("选择已清除")
except Exception as e:
logger.error(f"清除选择失败: {e}")
def sel_local(self, obj: Any):
"""选择本地对象"""
try:
if BLENDER_AVAILABLE and obj:
# 清除当前选择
bpy.ops.object.select_all(action='DESELECT')
# 选择指定对象
if obj.name in bpy.data.objects:
bpy.data.objects[obj.name].select_set(True)
bpy.context.view_layer.objects.active = bpy.data.objects[obj.name]
# 更新选择状态
self._selected_obj = obj.name # 存储名称而非引用
logger.info(f"选择对象: {obj.name}")
except Exception as e:
logger.error(f"选择对象失败: {e}")
# ==================== 材质和纹理管理 ====================
def add_mat_rgb(self, mat_id: str, alpha: float, r: int, g: int, b: int):
"""添加RGB材质"""
try:
if not BLENDER_AVAILABLE:
return
# 检查材质是否已存在
if mat_id in self.material_cache:
material_name = self.material_cache[mat_id]
if material_name in bpy.data.materials:
return bpy.data.materials[material_name]
# 创建新材质
material = bpy.data.materials.new(mat_id)
material.use_nodes = True
# 设置颜色
if material.node_tree:
principled = material.node_tree.nodes.get("Principled BSDF")
if principled:
color = (r/255.0, g/255.0, b/255.0, alpha)
principled.inputs[0].default_value = color
# 设置透明度
if alpha < 1.0:
material.blend_method = 'BLEND'
# Alpha input
principled.inputs[21].default_value = alpha
# 缓存材质
self.material_cache[mat_id] = material.name
memory_manager.register_object(material)
logger.info(f"创建RGB材质: {mat_id}")
return material
except Exception as e:
logger.error(f"创建RGB材质失败: {e}")
return None
# ==================== 系统监控和维护 ====================
def monitor_memory_usage(self):
"""监控内存使用情况"""
try:
if not BLENDER_AVAILABLE:
return
# 检查是否需要清理
if memory_manager.should_cleanup():
logger.info("触发定期内存清理")
cleanup_count = memory_manager.cleanup_orphaned_data()
if cleanup_count > 0:
logger.info(f"清理了 {cleanup_count} 个孤立数据")
gc.collect()
# 检查内存压力
total_objects = len(bpy.data.objects)
total_meshes = len(bpy.data.meshes)
if total_objects > 10000 or total_meshes > 5000:
logger.warning(
f"内存压力警告: {total_objects} 个对象, {total_meshes} 个网格")
# 执行强制清理
self.force_cleanup()
except Exception as e:
logger.error(f"内存监控失败: {e}")
def get_memory_report(self) -> Dict[str, Any]:
"""获取内存报告"""
try:
report = {
"timestamp": time.time(),
"creation_stats": self.get_creation_stats(),
"system_diagnosis": self.diagnose_system_state(),
"memory_manager": {
"object_registry_size": len(memory_manager.object_registry),
"mesh_registry_size": len(memory_manager.mesh_registry),
"last_cleanup": memory_manager.last_cleanup_time,
"cleanup_interval": memory_manager.cleanup_interval
}
}
if BLENDER_AVAILABLE:
report["blender_data"] = {
"objects": len(bpy.data.objects),
"meshes": len(bpy.data.meshes),
"materials": len(bpy.data.materials),
"textures": len(bpy.data.textures),
"images": len(bpy.data.images)
}
return report
except Exception as e:
logger.error(f"生成内存报告失败: {e}")
return {"error": str(e)}
# ==================== 属性访问器 ====================
@property
def selected_uid(self):
return self._selected_uid
@property
def selected_zone(self):
return self._selected_zone
@property
def selected_part(self):
return self._selected_part
@property
def selected_obj(self):
# 返回对象名称而非引用,避免悬空引用
return self._selected_obj
@property
def server_path(self):
return self._server_path
@property
def default_zone(self):
return self._default_zone
# ==================== 兼容性方法存根 ====================
def c05(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c05操作")
def c06(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c06操作")
def c07(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c07操作")
def c08(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c08操作")
def c10(self, data: Dict[str, Any]):
"""设置门信息"""
try:
uid = data.get("uid")
doors = data.get("drs", [])
logger.info(f"设置门信息: uid={uid}, {len(doors)} 个门")
# 存储门信息
if uid not in self.parts:
self.parts[uid] = {}
self.parts[uid]["doors"] = doors
except Exception as e:
logger.error(f"设置门信息失败: {e}")
def show_message(self, data: Dict[str, Any]):
"""显示消息"""
try:
message = data.get("message", "")
logger.info(f"显示消息: {message}")
except Exception as e:
logger.error(f"显示消息失败: {e}")
def set_config(self, data: Dict[str, Any]):
"""设置配置"""
try:
config = data.get("config", {})
logger.info(f"设置配置: {len(config)} 个配置项")
# 应用内存管理相关配置
if "memory_cleanup_interval" in config:
memory_manager.cleanup_interval = config["memory_cleanup_interval"]
if "batch_size" in config:
self.batch_size = config["batch_size"]
except Exception as e:
logger.error(f"设置配置失败: {e}")
# ==================== 清理和销毁 ====================
def shutdown(self):
"""关闭系统"""
try:
logger.info("开始关闭SUWood系统")
# 执行最终清理
self.force_cleanup()
# 清理所有缓存
self.mesh_cache.clear()
self.material_cache.clear()
self.object_references.clear()
# 清理数据结构
self.parts.clear()
self.zones.clear()
self.hardwares.clear()
logger.info("✅ SUWood系统关闭完成")
except Exception as e:
logger.error(f"关闭系统失败: {e}")
def __del__(self):
"""析构函数"""
try:
self.shutdown()
except:
pass
# ==================== 模块级别的便利函数 ====================
def create_suw_instance():
"""创建SUW实例的便利函数"""
return SUWImpl.get_instance()
def cleanup_blender_memory():
"""清理Blender内存的便利函数"""
try:
if BLENDER_AVAILABLE:
cleanup_count = memory_manager.cleanup_orphaned_data()
gc.collect()
logger.info(f"清理了 {cleanup_count} 个孤立数据")
return cleanup_count
return 0
except Exception as e:
logger.error(f"清理内存失败: {e}")
return 0
def get_memory_usage_summary():
"""获取内存使用摘要"""
try:
if BLENDER_AVAILABLE:
return {
"objects": len(bpy.data.objects),
"meshes": len(bpy.data.meshes),
"materials": len(bpy.data.materials),
"memory_manager_stats": memory_manager.creation_stats.copy()
}
return {"blender_available": False}
except Exception as e:
logger.error(f"获取内存使用摘要失败: {e}")
return {"error": str(e)}
# ==================== 主程序入口 ====================
if __name__ == "__main__":
# 测试代码
try:
logger.info("开始测试SUWImpl内存管理")
# 创建实例
suw = create_suw_instance()
suw.startup()
# 获取内存报告
report = suw.get_memory_report()
logger.info(f"内存报告: {report}")
# 执行诊断
issues = suw.diagnose_system_state()
if not issues:
logger.info("✅ 系统状态正常")
# 清理测试
cleanup_count = cleanup_blender_memory()
logger.info(f"清理测试完成: {cleanup_count}")
except Exception as e:
logger.error(f"测试失败: {e}")
finally:
logger.info("测试完成")