suwoodblender/blenderpython/suw_impl - 副本 (17).py

1712 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUW Implementation - Python翻译版本
原文件: SUWImpl.rb (2019行)
用途: 核心实现类SUWood的主要功能
翻译进度: Phase 1 - 几何类和基础框架
内存管理优化: 应用 Blender Python API 最佳实践
"""
import re
import math
import logging
import time
import gc
import weakref
from typing import Optional, Any, Dict, List, Tuple, Union
from contextlib import contextmanager
# 设置日志
logger = logging.getLogger(__name__)
# 尝试相对导入,失败则使用绝对导入
try:
from .suw_constants import SUWood
except ImportError:
try:
from suw_constants import SUWood
except ImportError:
# 如果都找不到,创建一个基本的存根
class SUWood:
@staticmethod
def suwood_path(version):
return "."
try:
import bpy
import mathutils
import bmesh
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
print("⚠️ Blender API 不可用,使用基础几何类")
# 创建存根mathutils模块
class MockMathutils:
class Vector:
def __init__(self, vec):
self.x, self.y, self.z = vec[:3] if len(
vec) >= 3 else (vec + [0, 0])[:3]
def normalized(self):
return self
def dot(self, other):
return 0
class Matrix:
@staticmethod
def Scale(scale, size, axis):
return MockMathutils.Matrix()
@staticmethod
def Translation(vec):
return MockMathutils.Matrix()
@staticmethod
def Rotation(angle, size):
return MockMathutils.Matrix()
def __matmul__(self, other):
return MockMathutils.Matrix()
mathutils = MockMathutils()
# ==================== 内存管理核心类 ====================
class BlenderMemoryManager:
"""Blender 内存管理器 - 应用最佳实践"""
def __init__(self):
self.object_registry = weakref.WeakSet()
self.mesh_registry = weakref.WeakSet()
self.material_registry = weakref.WeakSet()
self.creation_stats = {
'objects_created': 0,
'meshes_created': 0,
'materials_created': 0,
'cleanup_calls': 0,
'failed_cleanups': 0
}
self.last_cleanup_time = time.time()
self.cleanup_interval = 300 # 5分钟
def register_object(self, obj):
"""注册对象到弱引用集合"""
if obj:
self.object_registry.add(obj)
self.creation_stats['objects_created'] += 1
def register_mesh(self, mesh):
"""注册网格到弱引用集合"""
if mesh:
self.mesh_registry.add(mesh)
self.creation_stats['meshes_created'] += 1
def should_cleanup(self):
"""检查是否应该执行清理"""
return time.time() - self.last_cleanup_time > self.cleanup_interval
def cleanup_orphaned_data(self):
"""清理孤立数据"""
if not BLENDER_AVAILABLE:
return 0
cleanup_count = 0
try:
# 清理孤立网格
for mesh in list(bpy.data.meshes):
if mesh.users == 0:
try:
bpy.data.meshes.remove(mesh, do_unlink=True)
cleanup_count += 1
except:
pass
# 清理孤立材质
for material in list(bpy.data.materials):
if material.users == 0:
try:
bpy.data.materials.remove(material, do_unlink=True)
cleanup_count += 1
except:
pass
# 清理孤立纹理
for texture in list(bpy.data.textures):
if texture.users == 0:
try:
bpy.data.textures.remove(texture, do_unlink=True)
cleanup_count += 1
except:
pass
# 更新清理时间
self.last_cleanup_time = time.time()
self.creation_stats['cleanup_calls'] += 1
logger.info(f"清理了 {cleanup_count} 个孤立数据块")
return cleanup_count
except Exception as e:
logger.error(f"清理孤立数据失败: {e}")
self.creation_stats['failed_cleanups'] += 1
return 0
# 全局内存管理器实例
memory_manager = BlenderMemoryManager()
@contextmanager
def safe_blender_operation(operation_name: str):
"""
安全的 Blender 操作上下文管理器
使用方法:
with safe_blender_operation("创建网格"):
# 执行 Blender 操作
pass
"""
logger.info(f"开始操作: {operation_name}")
start_time = time.time()
# 记录初始状态
initial_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
initial_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
try:
yield
except Exception as e:
logger.error(f"操作失败 {operation_name}: {e}")
# 尝试清理
try:
if memory_manager.should_cleanup():
memory_manager.cleanup_orphaned_data()
gc.collect()
except:
pass
raise
finally:
# 记录最终状态
final_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
final_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
elapsed_time = time.time() - start_time
logger.info(f"操作完成: {operation_name}")
logger.info(f"耗时: {elapsed_time:.2f}")
logger.info(f"对象变化: {final_objects - initial_objects}")
logger.info(f"网格变化: {final_meshes - initial_meshes}")
# ==================== 几何类扩展 ====================
class Point3d:
"""3D点类 - 对应Ruby的Geom::Point3d"""
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
self.x = x
self.y = y
self.z = z
@classmethod
def parse(cls, value: str):
"""从字符串解析3D点"""
if not value or value.strip() == "":
return None
# 解析格式: "(x,y,z)" 或 "x,y,z"
clean_value = re.sub(r'[()]*', '', value)
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
# 转换mm为内部单位假设输入是mm
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
def to_s(self, unit: str = "mm", digits: int = -1) -> str:
"""转换为字符串"""
if unit == "cm":
x_val = self.x * 100 # 内部单位转换为cm
y_val = self.y * 100
z_val = self.z * 100
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
else: # mm
x_val = self.x * 1000 # 内部单位转换为mm
y_val = self.y * 1000
z_val = self.z * 1000
if digits == -1:
return f"({x_val}, {y_val}, {z_val})"
else:
return f"({x_val:.{digits}f}, {y_val:.{digits}f}, {z_val:.{digits}f})"
def __str__(self):
return self.to_s()
def __repr__(self):
return f"Point3d({self.x}, {self.y}, {self.z})"
class Vector3d:
"""3D向量类 - 对应Ruby的Geom::Vector3d"""
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
self.x = x
self.y = y
self.z = z
@classmethod
def parse(cls, value: str):
"""从字符串解析3D向量"""
if not value or value.strip() == "":
return None
clean_value = re.sub(r'[()]*', '', value)
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
def to_s(self, unit: str = "mm") -> str:
"""转换为字符串"""
if unit == "cm":
x_val = self.x * 100 # 内部单位转换为cm
y_val = self.y * 100
z_val = self.z * 100
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
elif unit == "in":
return f"({self.x}, {self.y}, {self.z})"
else: # mm
x_val = self.x * 1000 # 内部单位转换为mm
y_val = self.y * 1000
z_val = self.z * 1000
return f"({x_val}, {y_val}, {z_val})"
def normalize(self):
"""归一化向量"""
length = math.sqrt(self.x**2 + self.y**2 + self.z**2)
if length > 0:
return Vector3d(self.x/length, self.y/length, self.z/length)
return Vector3d(0, 0, 0)
def __str__(self):
return self.to_s()
class Transformation:
"""变换矩阵类 - 对应Ruby的Geom::Transformation"""
def __init__(self, origin: Point3d = None, x_axis: Vector3d = None,
y_axis: Vector3d = None, z_axis: Vector3d = None):
self.origin = origin or Point3d(0, 0, 0)
self.x_axis = x_axis or Vector3d(1, 0, 0)
self.y_axis = y_axis or Vector3d(0, 1, 0)
self.z_axis = z_axis or Vector3d(0, 0, 1)
@classmethod
def parse(cls, data: Dict[str, str]):
"""从字典解析变换"""
origin = Point3d.parse(data.get("o"))
x_axis = Vector3d.parse(data.get("x"))
y_axis = Vector3d.parse(data.get("y"))
z_axis = Vector3d.parse(data.get("z"))
return cls(origin, x_axis, y_axis, z_axis)
def store(self, data: Dict[str, str]):
"""存储变换到字典"""
data["o"] = self.origin.to_s("mm")
data["x"] = self.x_axis.to_s("in")
data["y"] = self.y_axis.to_s("in")
data["z"] = self.z_axis.to_s("in")
# ==================== SUWood 材质类型常量 ====================
MAT_TYPE_NORMAL = 0
MAT_TYPE_OBVERSE = 1
MAT_TYPE_NATURE = 2
# ==================== SUWImpl 核心实现类 ====================
class SUWImpl:
"""SUWood核心实现类 - 完整翻译版本,应用内存管理最佳实践"""
_instance = None
_selected_uid = None
_selected_obj = None
_selected_zone = None
_selected_part = None
_scaled_zone = None
_server_path = None
_default_zone = None
_creation_lock = False
_mesh_creation_count = 0
_batch_operation_active = False
def __init__(self):
"""初始化SUWImpl实例"""
# 基础属性
self.parts = {}
self.zones = {}
self.hardwares = {}
self.materials = {}
self.textures = {}
# 内存管理相关
self.object_references = {} # 存储对象名称而非引用
self.mesh_cache = {}
self.material_cache = {}
# 批量操作优化
self.deferred_updates = []
self.batch_size = 50
logger.info("SUWImpl 初始化完成,启用内存管理优化")
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def startup(self):
"""启动SUWood系统"""
logger.info("启动SUWood系统...")
if BLENDER_AVAILABLE:
try:
self._create_layers()
self._init_materials()
self._init_default_zone()
logger.info("✅ SUWood系统启动成功")
except Exception as e:
logger.error(f"❌ SUWood系统启动失败: {e}")
raise
else:
logger.warning("⚠️ Blender不可用使用基础模式")
def _create_layers(self):
"""创建图层"""
if not BLENDER_AVAILABLE:
return
try:
# 创建必要的集合
collections = ["SUWood_Parts", "SUWood_Hardware", "SUWood_Zones"]
for collection_name in collections:
if collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(collection_name)
bpy.context.scene.collection.children.link(collection)
logger.info(f"创建集合: {collection_name}")
except Exception as e:
logger.error(f"创建图层失败: {e}")
def _init_materials(self):
"""初始化材质"""
if not BLENDER_AVAILABLE:
return
try:
# 创建默认材质
default_materials = [
("SUWood_Default", (0.8, 0.8, 0.8, 1.0)),
("SUWood_Wood", (0.6, 0.4, 0.2, 1.0)),
("SUWood_Metal", (0.7, 0.7, 0.8, 1.0)),
]
for mat_name, color in default_materials:
if mat_name not in bpy.data.materials:
material = bpy.data.materials.new(mat_name)
material.use_nodes = True
if material.node_tree:
principled = material.node_tree.nodes.get(
"Principled BSDF")
if principled:
principled.inputs[0].default_value = color
self.material_cache[mat_name] = material.name # 存储名称而非引用
memory_manager.register_object(material)
logger.info(f"创建材质: {mat_name}")
except Exception as e:
logger.error(f"初始化材质失败: {e}")
def _init_default_zone(self):
"""初始化默认区域"""
try:
self._default_zone = {
"name": "Default Zone",
"id": "default_zone_001",
"visible": True,
"parts": {},
"hardwares": {}
}
self.zones["default"] = self._default_zone
logger.info("初始化默认区域")
except Exception as e:
logger.error(f"初始化默认区域失败: {e}")
# ==================== 内存管理优化的核心方法 ====================
def validate_mesh_data(self, vertices: List, faces: List) -> bool:
"""验证网格数据有效性"""
try:
# 检查顶点数量
if len(vertices) < 3:
logger.warning("顶点数量不足")
return False
# 检查面数据
for face in faces:
if len(face) < 3:
logger.warning("面顶点数量不足")
return False
# 检查顶点索引有效性
for vertex_index in face:
if vertex_index >= len(vertices) or vertex_index < 0:
logger.warning(f"无效的顶点索引: {vertex_index}")
return False
return True
except Exception as e:
logger.error(f"网格数据验证失败: {e}")
return False
def validate_coplanar_points(self, points: List[Tuple[float, float, float]]) -> bool:
"""验证点是否共面"""
if len(points) < 4:
return True # 3个点总是共面的
try:
# 使用前三个点定义平面
p1 = mathutils.Vector(points[0])
p2 = mathutils.Vector(points[1])
p3 = mathutils.Vector(points[2])
# 计算平面法向量
normal = (p2 - p1).cross(p3 - p1).normalized()
# 检查其他点是否在同一平面上
tolerance = 1e-6
for point in points[3:]:
p = mathutils.Vector(point)
distance = abs((p - p1).dot(normal))
if distance > tolerance:
return False
return True
except Exception as e:
logger.error(f"共面检查失败: {e}")
return False
@contextmanager
def atomic_mesh_creation(self, name: str):
"""原子网格创建上下文管理器"""
mesh = None
obj = None
try:
# 第1步创建网格数据
mesh = bpy.data.meshes.new(name)
memory_manager.register_mesh(mesh)
yield mesh
except Exception as e:
logger.error(f"原子网格创建失败: {e}")
# 自动清理
if obj:
self._cleanup_object_safe(obj)
if mesh:
self._cleanup_mesh_safe(mesh)
raise
def _cleanup_mesh_safe(self, mesh):
"""安全清理网格"""
try:
if mesh and mesh.name in bpy.data.meshes:
bpy.data.meshes.remove(mesh, do_unlink=True)
memory_manager.creation_stats['cleanup_calls'] += 1
logger.info(f"清理网格: {mesh.name}")
except Exception as e:
logger.error(f"网格清理失败: {e}")
memory_manager.creation_stats['failed_cleanups'] += 1
def _cleanup_object_safe(self, obj):
"""安全清理对象"""
try:
if obj and obj.name in bpy.data.objects:
# 从所有集合中移除
for collection in obj.users_collection:
collection.objects.unlink(obj)
# 删除对象
bpy.data.objects.remove(obj, do_unlink=True)
memory_manager.creation_stats['cleanup_calls'] += 1
logger.info(f"清理对象: {obj.name}")
except Exception as e:
logger.error(f"对象清理失败: {e}")
memory_manager.creation_stats['failed_cleanups'] += 1
def _create_single_mesh_atomic(self, container: Any, surface: Dict[str, Any],
color: str = None, scale: float = None, angle: float = None,
series: List = None, uid: str = None, zid: Any = None,
pid: Any = None, child: Any = None, options: Dict = None) -> Any:
"""优化的原子性mesh创建"""
with safe_blender_operation(f"创建网格 uid={uid}"):
try:
# 增加创建计数
self.__class__._mesh_creation_count += 1
creation_count = self.__class__._mesh_creation_count
# 定期清理
if creation_count % 100 == 0:
logger.info(f"执行定期清理,当前创建数: {creation_count}")
memory_manager.cleanup_orphaned_data()
gc.collect()
# 数据验证
segs = surface.get("segs", [])
points = []
for seg in segs:
if isinstance(seg, list) and len(seg) >= 1:
point_str = seg[0] if isinstance(
seg[0], str) else str(seg[0])
try:
point = Point3d.parse(point_str)
if point:
points.append(point)
except Exception as e:
logger.warning(f"点解析失败: {point_str}")
if len(points) < 3:
logger.warning(f"有效点数不足: {len(points)}")
return None
# 面积验证
area = self._calculate_face_area(points)
if area < 1e-6:
logger.warning(f"面积过小: {area}")
return None
# 验证几何数据
vertices = [(float(p.x), float(p.y), float(p.z))
for p in points]
faces = [list(range(len(vertices)))] if len(
vertices) >= 3 else []
if not self.validate_mesh_data(vertices, faces):
logger.warning("网格数据验证失败")
return None
# 原子性创建
timestamp = hash(
str(points[0]) + str(uid) + str(child) + str(creation_count))
mesh_name = f"SUWMesh_{abs(timestamp) % 10000}"
with self.atomic_mesh_creation(mesh_name) as mesh:
# 设置几何数据
mesh.from_pydata(vertices, [], faces)
# 创建UV坐标
if len(vertices) >= 3:
try:
uv_layer = mesh.uv_layers.new(name="UVMap")
self._generate_uv_coordinates(mesh, uv_layer)
except Exception as e:
logger.warning(f"UV坐标创建失败: {e}")
# 安全更新网格
if not self._safe_mesh_update(mesh):
raise RuntimeError("网格更新失败")
# 创建对象
obj_name = f"SUWObj_{abs(timestamp) % 10000}"
obj = bpy.data.objects.new(obj_name, mesh)
memory_manager.register_object(obj)
# 添加到集合
if hasattr(container, 'objects'):
container.objects.link(obj)
else:
bpy.context.scene.collection.objects.link(obj)
# 设置属性
self._set_object_attributes_safe(
obj, uid, zid, pid, child, area, creation_count)
# 应用材质
if color:
self._apply_material_safe(obj, color)
# 存储对象引用(名称而非对象)
self.object_references[obj_name] = {
'uid': uid,
'type': 'mesh',
'creation_time': time.time()
}
logger.info(f"✅ 网格创建成功: {obj.name}, {len(vertices)}顶点")
return obj
except Exception as e:
logger.error(f"网格创建失败: {e}")
return None
def _generate_uv_coordinates(self, mesh, uv_layer):
"""生成UV坐标"""
try:
for poly in mesh.polygons:
for i, loop_index in enumerate(poly.loop_indices):
vertex_count = len(poly.vertices)
if vertex_count == 4: # 四边形
uv_coords = [(0, 0), (1, 0), (1, 1), (0, 1)]
uv_layer.data[loop_index].uv = uv_coords[i % 4]
elif vertex_count == 3: # 三角形
uv_coords = [(0, 0), (1, 0), (0.5, 1)]
uv_layer.data[loop_index].uv = uv_coords[i % 3]
else: # 其他多边形
angle = (i / vertex_count) * 2 * math.pi
u = (math.cos(angle) + 1) * 0.5
v = (math.sin(angle) + 1) * 0.5
uv_layer.data[loop_index].uv = (u, v)
except Exception as e:
logger.error(f"UV坐标生成失败: {e}")
def _safe_mesh_update(self, mesh, max_retries=3):
"""安全更新网格"""
for attempt in range(max_retries):
try:
# 检查网格是否仍然有效
if not mesh or mesh.name not in bpy.data.meshes:
return False
# 更新网格
mesh.update()
# 验证更新结果
if not mesh.vertices:
raise RuntimeError("网格更新后无顶点")
return True
except Exception as e:
logger.warning(f"网格更新失败,尝试 {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
time.sleep(0.1) # 短暂等待
else:
return False
return False
def _set_object_attributes_safe(self, obj, uid, zid, pid, child, area, creation_count):
"""安全设置对象属性"""
try:
obj["uid"] = str(uid) if uid else "unknown"
obj["zid"] = str(zid) if zid else "unknown"
obj["pid"] = str(pid) if pid else "unknown"
obj["child"] = str(child) if child else "unknown"
obj["area"] = float(area) if area else 0.0
obj["creation_index"] = creation_count
obj["creation_time"] = time.time()
obj["memory_managed"] = True
except Exception as e:
logger.error(f"设置对象属性失败: {e}")
def _apply_material_safe(self, obj, color):
"""安全应用材质"""
try:
material = self.get_texture(color)
if material:
if obj.data.materials:
obj.data.materials[0] = material
else:
obj.data.materials.append(material)
logger.info(f"材质应用成功: {color}")
else:
self._apply_simple_material(obj, color)
except Exception as e:
logger.error(f"材质应用失败: {e}")
try:
self._apply_simple_material(obj, color)
except:
pass
def _apply_simple_material(self, obj: Any, color: str):
"""应用简单材质"""
try:
# 创建或获取缓存的材质
mat_name = f"SUWMat_{color}" if color else "SUWMat_default"
if mat_name in self.material_cache:
material_name = self.material_cache[mat_name]
if material_name in bpy.data.materials:
material = bpy.data.materials[material_name]
obj.data.materials.append(material)
return
# 创建新材质
material = bpy.data.materials.new(mat_name)
material.use_nodes = True
if material.node_tree and material.node_tree.nodes:
principled = material.node_tree.nodes.get("Principled BSDF")
if principled:
if color and len(color) >= 6:
try:
# 解析颜色
r = int(color[0:2], 16) / 255.0
g = int(color[2:4], 16) / 255.0
b = int(color[4:6], 16) / 255.0
principled.inputs[0].default_value = (r, g, b, 1.0)
except:
principled.inputs[0].default_value = (
0.8, 0.8, 0.8, 1.0)
else:
principled.inputs[0].default_value = (
0.8, 0.8, 0.8, 1.0)
# 缓存材质
self.material_cache[mat_name] = material.name
memory_manager.register_object(material)
# 应用材质
obj.data.materials.append(material)
logger.info(f"简单材质创建并应用: {mat_name}")
except Exception as e:
logger.error(f"简单材质应用失败: {e}")
# ==================== 优化的删除和清理方法 ====================
def _force_delete_objects_optimized(self, objects_to_delete: List[Any]):
"""优化的强制删除对象"""
try:
if not BLENDER_AVAILABLE:
return
logger.info(f"开始优化删除 {len(objects_to_delete)} 个对象")
# 预处理:收集对象名称
object_names = []
for obj in objects_to_delete:
if obj and hasattr(obj, 'name'):
object_names.append(obj.name)
# 批量删除
deleted_count = 0
for obj_name in object_names:
try:
if obj_name in bpy.data.objects:
obj = bpy.data.objects[obj_name]
# 从集合中移除
collections_to_unlink = list(obj.users_collection)
for collection in collections_to_unlink:
try:
collection.objects.unlink(obj)
except:
pass
# 删除对象
bpy.data.objects.remove(obj, do_unlink=True)
deleted_count += 1
# 从引用中移除
if obj_name in self.object_references:
del self.object_references[obj_name]
except Exception as e:
logger.error(f"删除对象失败 {obj_name}: {e}")
# 更新依赖图
if deleted_count > 0:
self._update_dependency_graph(full_update=True)
logger.info(f"✅ 优化删除完成: {deleted_count} 个对象")
except Exception as e:
logger.error(f"优化删除失败: {e}")
def _update_dependency_graph(self, full_update: bool = False):
"""更新Blender依赖图"""
try:
if not BLENDER_AVAILABLE:
return
if full_update:
logger.info("执行全局依赖图更新")
bpy.context.view_layer.update()
bpy.context.evaluated_depsgraph_get().update()
# 刷新视图
try:
for area in bpy.context.screen.areas:
if area.type in ['VIEW_3D', 'OUTLINER']:
area.tag_redraw()
except:
pass
logger.info("全局依赖图更新完成")
else:
# 快速更新
bpy.context.view_layer.update()
except Exception as e:
logger.error(f"依赖图更新失败: {e}")
# ==================== 保持原有业务逻辑的方法 ====================
def get_zones(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取区域信息"""
return self.zones
def get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取零件信息"""
return self.parts
def get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取硬件信息"""
return self.hardwares
def get_texture(self, key: str):
"""获取纹理材质"""
if not BLENDER_AVAILABLE:
return None
try:
# 从缓存中获取
if key in self.material_cache:
material_name = self.material_cache[key]
if material_name in bpy.data.materials:
return bpy.data.materials[material_name]
# 查找现有材质
for material in bpy.data.materials:
if key in material.name:
self.material_cache[key] = material.name
return material
return None
except Exception as e:
logger.error(f"获取纹理失败: {e}")
return None
def create_face(self, container: Any, surface: Dict[str, Any], color: str = None,
scale: float = None, angle: float = None, series: List = None,
uid: str = None, zid: Any = None, pid: Any = None,
child: Any = None) -> Any:
"""创建面 - 使用优化的内存管理"""
try:
return self._create_single_mesh_atomic(
container, surface, color, scale, angle,
series, uid, zid, pid, child
)
except Exception as e:
logger.error(f"创建面失败: {e}")
return None
def _calculate_face_area(self, points: List[Point3d]) -> float:
"""计算面积"""
try:
if len(points) < 3:
return 0.0
# 使用三角剖分计算面积
area = 0.0
for i in range(1, len(points) - 1):
p1 = points[0]
p2 = points[i]
p3 = points[i + 1]
# 计算三角形面积
v1 = Vector3d(p2.x - p1.x, p2.y - p1.y, p2.z - p1.z)
v2 = Vector3d(p3.x - p1.x, p3.y - p1.y, p3.z - p1.z)
# 叉积的模长的一半就是三角形面积
cross_x = v1.y * v2.z - v1.z * v2.y
cross_y = v1.z * v2.x - v1.x * v2.z
cross_z = v1.x * v2.y - v1.y * v2.x
triangle_area = 0.5 * \
math.sqrt(cross_x**2 + cross_y**2 + cross_z**2)
area += triangle_area
return area
except Exception as e:
logger.error(f"计算面积失败: {e}")
return 0.0
# ==================== 系统管理方法 ====================
def get_creation_stats(self) -> Dict[str, Any]:
"""获取创建统计信息"""
try:
stats = {
"creation_count": self.__class__._mesh_creation_count,
"object_references": len(self.object_references),
"mesh_cache_size": len(self.mesh_cache),
"material_cache_size": len(self.material_cache),
"memory_manager_stats": memory_manager.creation_stats.copy(),
"blender_available": BLENDER_AVAILABLE
}
if BLENDER_AVAILABLE:
stats["total_objects"] = len(bpy.data.objects)
stats["total_meshes"] = len(bpy.data.meshes)
stats["total_materials"] = len(bpy.data.materials)
return stats
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return {"error": str(e)}
def force_cleanup(self):
"""强制清理"""
try:
logger.info("开始强制清理")
# 清理孤立数据
cleanup_count = memory_manager.cleanup_orphaned_data()
# 清理缓存
self.mesh_cache.clear()
# 清理过期的对象引用
current_time = time.time()
expired_refs = []
for obj_name, ref_info in self.object_references.items():
if current_time - ref_info.get('creation_time', 0) > 3600: # 1小时
expired_refs.append(obj_name)
for obj_name in expired_refs:
del self.object_references[obj_name]
# 强制垃圾回收
gc.collect()
# 更新依赖图
if BLENDER_AVAILABLE:
self._update_dependency_graph(full_update=True)
logger.info(
f"强制清理完成: 清理了 {cleanup_count} 个数据块,{len(expired_refs)} 个过期引用")
except Exception as e:
logger.error(f"强制清理失败: {e}")
def diagnose_system_state(self):
"""诊断系统状态"""
try:
logger.info("=== 系统状态诊断 ===")
# 内存使用情况
stats = self.get_creation_stats()
for key, value in stats.items():
logger.info(f"{key}: {value}")
# 检查潜在问题
issues = []
if BLENDER_AVAILABLE:
# 检查孤立数据
orphaned_meshes = [m for m in bpy.data.meshes if m.users == 0]
if orphaned_meshes:
issues.append(f"发现 {len(orphaned_meshes)} 个孤立网格")
# 检查空网格
empty_meshes = [m for m in bpy.data.meshes if not m.vertices]
if empty_meshes:
issues.append(f"发现 {len(empty_meshes)} 个空网格")
# 检查总顶点数
total_vertices = sum(len(m.vertices) for m in bpy.data.meshes)
if total_vertices > 1000000: # 100万顶点
issues.append(f"顶点数量过多: {total_vertices}")
if issues:
logger.warning("发现问题:")
for issue in issues:
logger.warning(f" - {issue}")
else:
logger.info("✅ 系统状态正常")
return issues
except Exception as e:
logger.error(f"系统诊断失败: {e}")
return [f"诊断失败: {e}"]
# ==================== 其他必要的方法存根 ====================
def c09(self, data: Dict[str, Any]):
"""删除操作 - 使用优化的内存管理"""
try:
uid = data.get("uid")
typ = data.get("typ", "uid")
oid = data.get("oid")
logger.info(f"执行删除操作: uid={uid}, typ={typ}, oid={oid}")
# 查找要删除的对象
objects_to_delete = self._find_objects_for_deletion(uid, typ, oid)
if objects_to_delete:
# 使用优化的删除方法
self._force_delete_objects_optimized(objects_to_delete)
# 清理数据
self._safe_data_cleanup(uid, typ, oid)
except Exception as e:
logger.error(f"删除操作失败: {e}")
def _find_objects_for_deletion(self, uid: str, typ: str, oid: Any) -> List[Any]:
"""查找要删除的对象"""
try:
if not BLENDER_AVAILABLE:
return []
objects_to_delete = []
# 根据对象引用查找
for obj_name, ref_info in self.object_references.items():
if ref_info.get('uid') == uid:
if obj_name in bpy.data.objects:
objects_to_delete.append(bpy.data.objects[obj_name])
# 根据对象属性查找
for obj in bpy.data.objects:
if obj.get("uid") == uid:
objects_to_delete.append(obj)
logger.info(f"找到 {len(objects_to_delete)} 个要删除的对象")
return objects_to_delete
except Exception as e:
logger.error(f"查找删除对象失败: {e}")
return []
def _safe_data_cleanup(self, uid: str, typ: str, oid: Any):
"""安全的数据清理"""
try:
cleanup_count = 0
# 清理内部数据结构
data_fields = [self.parts, self.zones, self.hardwares]
for data_dict in data_fields:
if uid in data_dict:
del data_dict[uid]
cleanup_count += 1
# 清理对象引用
refs_to_remove = []
for obj_name, ref_info in self.object_references.items():
if ref_info.get('uid') == uid:
refs_to_remove.append(obj_name)
for obj_name in refs_to_remove:
del self.object_references[obj_name]
cleanup_count += 1
logger.info(f"数据清理完成: {cleanup_count}")
except Exception as e:
logger.error(f"数据清理失败: {e}")
# ==================== 其他业务逻辑方法的存根 ====================
def c00(self, data: Dict[str, Any]):
"""业务方法存根"""
pass
def c01(self, data: Dict[str, Any]):
"""创建区域"""
try:
uid = data.get("uid")
zone_data = data.get("zone", {})
with safe_blender_operation(f"创建区域 uid={uid}"):
# 创建区域集合
collection_name = f"Zone_{uid}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(collection_name)
bpy.context.scene.collection.children.link(collection)
memory_manager.register_object(collection)
# 存储区域数据
self.zones[uid] = {
"collection_name": collection_name,
"data": zone_data,
"parts": {},
"creation_time": time.time()
}
logger.info(f"创建区域成功: {uid}")
except Exception as e:
logger.error(f"创建区域失败: {e}")
def c02(self, data: Dict[str, Any]):
"""创建零件"""
try:
uid = data.get("uid")
parts_data = data.get("parts", {})
with safe_blender_operation(f"创建零件 uid={uid}"):
# 批量创建零件
created_parts = {}
for part_id, part_data in parts_data.items():
try:
# 创建零件集合
collection_name = f"Part_{uid}_{part_id}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(
collection_name)
# 添加到对应的区域集合
if uid in self.zones:
zone_collection_name = self.zones[uid]["collection_name"]
if zone_collection_name in bpy.data.collections:
parent_collection = bpy.data.collections[zone_collection_name]
parent_collection.children.link(collection)
else:
bpy.context.scene.collection.children.link(
collection)
memory_manager.register_object(collection)
created_parts[part_id] = {
"collection_name": collection_name,
"data": part_data,
"creation_time": time.time()
}
except Exception as e:
logger.error(f"创建零件失败 {part_id}: {e}")
# 存储零件数据
self.parts[uid] = created_parts
logger.info(f"创建零件成功: {uid}, {len(created_parts)} 个零件")
except Exception as e:
logger.error(f"创建零件操作失败: {e}")
def c03(self, data: Dict[str, Any]):
"""创建硬件"""
try:
uid = data.get("uid")
hardware_data = data.get("hardwares", {})
with safe_blender_operation(f"创建硬件 uid={uid}"):
created_hardware = {}
for hw_id, hw_data in hardware_data.items():
try:
# 创建硬件集合
collection_name = f"Hardware_{uid}_{hw_id}"
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
collection = bpy.data.collections.new(
collection_name)
bpy.context.scene.collection.children.link(
collection)
memory_manager.register_object(collection)
created_hardware[hw_id] = {
"collection_name": collection_name,
"data": hw_data,
"creation_time": time.time()
}
except Exception as e:
logger.error(f"创建硬件失败 {hw_id}: {e}")
# 存储硬件数据
self.hardwares[uid] = created_hardware
logger.info(f"创建硬件成功: {uid}, {len(created_hardware)} 个硬件")
except Exception as e:
logger.error(f"创建硬件操作失败: {e}")
def c04(self, data: Dict[str, Any]):
"""创建面和边 - 优化版本"""
try:
uid = data.get("uid")
parts_data = data.get("parts", {})
with safe_blender_operation(f"创建面和边 uid={uid}"):
# 批量创建优化
total_created = 0
for part_id, part_data in parts_data.items():
try:
# 获取零件集合
collection = None
if BLENDER_AVAILABLE:
collection_name = f"Part_{uid}_{part_id}"
if collection_name in bpy.data.collections:
collection = bpy.data.collections[collection_name]
# 创建面
surfaces = part_data.get("surfaces", [])
for surface in surfaces:
try:
obj = self.create_face(
collection or bpy.context.scene.collection,
surface,
surface.get("color"),
surface.get("scale"),
surface.get("angle"),
uid=uid,
pid=part_id
)
if obj:
total_created += 1
except Exception as e:
logger.error(f"创建面失败: {e}")
# 创建边
edges = part_data.get("edges", [])
for edge in edges:
try:
self.create_edges(
collection or bpy.context.scene.collection,
[edge]
)
total_created += 1
except Exception as e:
logger.error(f"创建边失败: {e}")
except Exception as e:
logger.error(f"处理零件失败 {part_id}: {e}")
logger.info(f"创建面和边完成: {total_created} 个对象")
# 定期清理
if total_created > 50:
memory_manager.cleanup_orphaned_data()
except Exception as e:
logger.error(f"创建面和边操作失败: {e}")
def create_edges(self, container: Any, edges_data: List) -> List:
"""创建边"""
try:
if not BLENDER_AVAILABLE:
return []
created_edges = []
for edge_data in edges_data:
try:
# 解析边的起点和终点
start_point = Point3d.parse(
edge_data.get("start", "0,0,0"))
end_point = Point3d.parse(edge_data.get("end", "0,0,0"))
if start_point and end_point:
edge_obj = self._create_line_edge(
container, start_point, end_point)
if edge_obj:
created_edges.append(edge_obj)
memory_manager.register_object(edge_obj)
except Exception as e:
logger.error(f"创建单条边失败: {e}")
return created_edges
except Exception as e:
logger.error(f"创建边失败: {e}")
return []
def _create_line_edge(self, container: Any, start: Point3d, end: Point3d) -> Any:
"""创建线边"""
try:
if not BLENDER_AVAILABLE:
return None
# 创建边网格
mesh_name = f"SUWEdge_{hash(str(start) + str(end)) % 10000}"
mesh = bpy.data.meshes.new(mesh_name)
memory_manager.register_mesh(mesh)
# 设置边的顶点和边
vertices = [
(start.x, start.y, start.z),
(end.x, end.y, end.z)
]
edges = [(0, 1)]
mesh.from_pydata(vertices, edges, [])
mesh.update()
# 创建对象
obj_name = f"SUWEdgeObj_{hash(str(start) + str(end)) % 10000}"
obj = bpy.data.objects.new(obj_name, mesh)
memory_manager.register_object(obj)
# 添加到集合
if hasattr(container, 'objects'):
container.objects.link(obj)
else:
bpy.context.scene.collection.objects.link(obj)
return obj
except Exception as e:
logger.error(f"创建线边失败: {e}")
return None
# ==================== 选择和可见性管理 ====================
def sel_clear(self):
"""清除选择"""
try:
if BLENDER_AVAILABLE:
bpy.ops.object.select_all(action='DESELECT')
self._selected_uid = None
self._selected_obj = None
self._selected_zone = None
self._selected_part = None
logger.info("选择已清除")
except Exception as e:
logger.error(f"清除选择失败: {e}")
def sel_local(self, obj: Any):
"""选择本地对象"""
try:
if BLENDER_AVAILABLE and obj:
# 清除当前选择
bpy.ops.object.select_all(action='DESELECT')
# 选择指定对象
if obj.name in bpy.data.objects:
bpy.data.objects[obj.name].select_set(True)
bpy.context.view_layer.objects.active = bpy.data.objects[obj.name]
# 更新选择状态
self._selected_obj = obj.name # 存储名称而非引用
logger.info(f"选择对象: {obj.name}")
except Exception as e:
logger.error(f"选择对象失败: {e}")
# ==================== 材质和纹理管理 ====================
def add_mat_rgb(self, mat_id: str, alpha: float, r: int, g: int, b: int):
"""添加RGB材质"""
try:
if not BLENDER_AVAILABLE:
return
# 检查材质是否已存在
if mat_id in self.material_cache:
material_name = self.material_cache[mat_id]
if material_name in bpy.data.materials:
return bpy.data.materials[material_name]
# 创建新材质
material = bpy.data.materials.new(mat_id)
material.use_nodes = True
# 设置颜色
if material.node_tree:
principled = material.node_tree.nodes.get("Principled BSDF")
if principled:
color = (r/255.0, g/255.0, b/255.0, alpha)
principled.inputs[0].default_value = color
# 设置透明度
if alpha < 1.0:
material.blend_method = 'BLEND'
# Alpha input
principled.inputs[21].default_value = alpha
# 缓存材质
self.material_cache[mat_id] = material.name
memory_manager.register_object(material)
logger.info(f"创建RGB材质: {mat_id}")
return material
except Exception as e:
logger.error(f"创建RGB材质失败: {e}")
return None
# ==================== 系统监控和维护 ====================
def monitor_memory_usage(self):
"""监控内存使用情况"""
try:
if not BLENDER_AVAILABLE:
return
# 检查是否需要清理
if memory_manager.should_cleanup():
logger.info("触发定期内存清理")
cleanup_count = memory_manager.cleanup_orphaned_data()
if cleanup_count > 0:
logger.info(f"清理了 {cleanup_count} 个孤立数据")
gc.collect()
# 检查内存压力
total_objects = len(bpy.data.objects)
total_meshes = len(bpy.data.meshes)
if total_objects > 10000 or total_meshes > 5000:
logger.warning(
f"内存压力警告: {total_objects} 个对象, {total_meshes} 个网格")
# 执行强制清理
self.force_cleanup()
except Exception as e:
logger.error(f"内存监控失败: {e}")
def get_memory_report(self) -> Dict[str, Any]:
"""获取内存报告"""
try:
report = {
"timestamp": time.time(),
"creation_stats": self.get_creation_stats(),
"system_diagnosis": self.diagnose_system_state(),
"memory_manager": {
"object_registry_size": len(memory_manager.object_registry),
"mesh_registry_size": len(memory_manager.mesh_registry),
"last_cleanup": memory_manager.last_cleanup_time,
"cleanup_interval": memory_manager.cleanup_interval
}
}
if BLENDER_AVAILABLE:
report["blender_data"] = {
"objects": len(bpy.data.objects),
"meshes": len(bpy.data.meshes),
"materials": len(bpy.data.materials),
"textures": len(bpy.data.textures),
"images": len(bpy.data.images)
}
return report
except Exception as e:
logger.error(f"生成内存报告失败: {e}")
return {"error": str(e)}
# ==================== 属性访问器 ====================
@property
def selected_uid(self):
return self._selected_uid
@property
def selected_zone(self):
return self._selected_zone
@property
def selected_part(self):
return self._selected_part
@property
def selected_obj(self):
# 返回对象名称而非引用,避免悬空引用
return self._selected_obj
@property
def server_path(self):
return self._server_path
@property
def default_zone(self):
return self._default_zone
# ==================== 兼容性方法存根 ====================
def c05(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c05操作")
def c06(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c06操作")
def c07(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c07操作")
def c08(self, data: Dict[str, Any]):
"""业务方法存根 - 保持兼容性"""
logger.info("执行c08操作")
def c10(self, data: Dict[str, Any]):
"""设置门信息"""
try:
uid = data.get("uid")
doors = data.get("drs", [])
logger.info(f"设置门信息: uid={uid}, {len(doors)} 个门")
# 存储门信息
if uid not in self.parts:
self.parts[uid] = {}
self.parts[uid]["doors"] = doors
except Exception as e:
logger.error(f"设置门信息失败: {e}")
def show_message(self, data: Dict[str, Any]):
"""显示消息"""
try:
message = data.get("message", "")
logger.info(f"显示消息: {message}")
except Exception as e:
logger.error(f"显示消息失败: {e}")
def set_config(self, data: Dict[str, Any]):
"""设置配置"""
try:
config = data.get("config", {})
logger.info(f"设置配置: {len(config)} 个配置项")
# 应用内存管理相关配置
if "memory_cleanup_interval" in config:
memory_manager.cleanup_interval = config["memory_cleanup_interval"]
if "batch_size" in config:
self.batch_size = config["batch_size"]
except Exception as e:
logger.error(f"设置配置失败: {e}")
# ==================== 清理和销毁 ====================
def shutdown(self):
"""关闭系统"""
try:
logger.info("开始关闭SUWood系统")
# 执行最终清理
self.force_cleanup()
# 清理所有缓存
self.mesh_cache.clear()
self.material_cache.clear()
self.object_references.clear()
# 清理数据结构
self.parts.clear()
self.zones.clear()
self.hardwares.clear()
logger.info("✅ SUWood系统关闭完成")
except Exception as e:
logger.error(f"关闭系统失败: {e}")
def __del__(self):
"""析构函数"""
try:
self.shutdown()
except:
pass
# ==================== 模块级别的便利函数 ====================
def create_suw_instance():
"""创建SUW实例的便利函数"""
return SUWImpl.get_instance()
def cleanup_blender_memory():
"""清理Blender内存的便利函数"""
try:
if BLENDER_AVAILABLE:
cleanup_count = memory_manager.cleanup_orphaned_data()
gc.collect()
logger.info(f"清理了 {cleanup_count} 个孤立数据")
return cleanup_count
return 0
except Exception as e:
logger.error(f"清理内存失败: {e}")
return 0
def get_memory_usage_summary():
"""获取内存使用摘要"""
try:
if BLENDER_AVAILABLE:
return {
"objects": len(bpy.data.objects),
"meshes": len(bpy.data.meshes),
"materials": len(bpy.data.materials),
"memory_manager_stats": memory_manager.creation_stats.copy()
}
return {"blender_available": False}
except Exception as e:
logger.error(f"获取内存使用摘要失败: {e}")
return {"error": str(e)}
# ==================== 主程序入口 ====================
if __name__ == "__main__":
# 测试代码
try:
logger.info("开始测试SUWImpl内存管理")
# 创建实例
suw = create_suw_instance()
suw.startup()
# 获取内存报告
report = suw.get_memory_report()
logger.info(f"内存报告: {report}")
# 执行诊断
issues = suw.diagnose_system_state()
if not issues:
logger.info("✅ 系统状态正常")
# 清理测试
cleanup_count = cleanup_blender_memory()
logger.info(f"清理测试完成: {cleanup_count}")
except Exception as e:
logger.error(f"测试失败: {e}")
finally:
logger.info("测试完成")