1712 lines
58 KiB
Python
1712 lines
58 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
SUW Implementation - Python翻译版本
|
||
原文件: SUWImpl.rb (2019行)
|
||
用途: 核心实现类,SUWood的主要功能
|
||
|
||
翻译进度: Phase 1 - 几何类和基础框架
|
||
内存管理优化: 应用 Blender Python API 最佳实践
|
||
"""
|
||
|
||
import re
|
||
import math
|
||
import logging
|
||
import time
|
||
import gc
|
||
import weakref
|
||
from typing import Optional, Any, Dict, List, Tuple, Union
|
||
from contextlib import contextmanager
|
||
|
||
# 设置日志
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试相对导入,失败则使用绝对导入
|
||
try:
|
||
from .suw_constants import SUWood
|
||
except ImportError:
|
||
try:
|
||
from suw_constants import SUWood
|
||
except ImportError:
|
||
# 如果都找不到,创建一个基本的存根
|
||
class SUWood:
|
||
@staticmethod
|
||
def suwood_path(version):
|
||
return "."
|
||
|
||
try:
|
||
import bpy
|
||
import mathutils
|
||
import bmesh
|
||
BLENDER_AVAILABLE = True
|
||
except ImportError:
|
||
BLENDER_AVAILABLE = False
|
||
print("⚠️ Blender API 不可用,使用基础几何类")
|
||
# 创建存根mathutils模块
|
||
|
||
class MockMathutils:
|
||
class Vector:
|
||
def __init__(self, vec):
|
||
self.x, self.y, self.z = vec[:3] if len(
|
||
vec) >= 3 else (vec + [0, 0])[:3]
|
||
|
||
def normalized(self):
|
||
return self
|
||
|
||
def dot(self, other):
|
||
return 0
|
||
|
||
class Matrix:
|
||
@staticmethod
|
||
def Scale(scale, size, axis):
|
||
return MockMathutils.Matrix()
|
||
|
||
@staticmethod
|
||
def Translation(vec):
|
||
return MockMathutils.Matrix()
|
||
|
||
@staticmethod
|
||
def Rotation(angle, size):
|
||
return MockMathutils.Matrix()
|
||
|
||
def __matmul__(self, other):
|
||
return MockMathutils.Matrix()
|
||
|
||
mathutils = MockMathutils()
|
||
|
||
# ==================== 内存管理核心类 ====================
|
||
|
||
|
||
class BlenderMemoryManager:
|
||
"""Blender 内存管理器 - 应用最佳实践"""
|
||
|
||
def __init__(self):
|
||
self.object_registry = weakref.WeakSet()
|
||
self.mesh_registry = weakref.WeakSet()
|
||
self.material_registry = weakref.WeakSet()
|
||
self.creation_stats = {
|
||
'objects_created': 0,
|
||
'meshes_created': 0,
|
||
'materials_created': 0,
|
||
'cleanup_calls': 0,
|
||
'failed_cleanups': 0
|
||
}
|
||
self.last_cleanup_time = time.time()
|
||
self.cleanup_interval = 300 # 5分钟
|
||
|
||
def register_object(self, obj):
|
||
"""注册对象到弱引用集合"""
|
||
if obj:
|
||
self.object_registry.add(obj)
|
||
self.creation_stats['objects_created'] += 1
|
||
|
||
def register_mesh(self, mesh):
|
||
"""注册网格到弱引用集合"""
|
||
if mesh:
|
||
self.mesh_registry.add(mesh)
|
||
self.creation_stats['meshes_created'] += 1
|
||
|
||
def should_cleanup(self):
|
||
"""检查是否应该执行清理"""
|
||
return time.time() - self.last_cleanup_time > self.cleanup_interval
|
||
|
||
def cleanup_orphaned_data(self):
|
||
"""清理孤立数据"""
|
||
if not BLENDER_AVAILABLE:
|
||
return 0
|
||
|
||
cleanup_count = 0
|
||
|
||
try:
|
||
# 清理孤立网格
|
||
for mesh in list(bpy.data.meshes):
|
||
if mesh.users == 0:
|
||
try:
|
||
bpy.data.meshes.remove(mesh, do_unlink=True)
|
||
cleanup_count += 1
|
||
except:
|
||
pass
|
||
|
||
# 清理孤立材质
|
||
for material in list(bpy.data.materials):
|
||
if material.users == 0:
|
||
try:
|
||
bpy.data.materials.remove(material, do_unlink=True)
|
||
cleanup_count += 1
|
||
except:
|
||
pass
|
||
|
||
# 清理孤立纹理
|
||
for texture in list(bpy.data.textures):
|
||
if texture.users == 0:
|
||
try:
|
||
bpy.data.textures.remove(texture, do_unlink=True)
|
||
cleanup_count += 1
|
||
except:
|
||
pass
|
||
|
||
# 更新清理时间
|
||
self.last_cleanup_time = time.time()
|
||
self.creation_stats['cleanup_calls'] += 1
|
||
|
||
logger.info(f"清理了 {cleanup_count} 个孤立数据块")
|
||
return cleanup_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"清理孤立数据失败: {e}")
|
||
self.creation_stats['failed_cleanups'] += 1
|
||
return 0
|
||
|
||
|
||
# 全局内存管理器实例
|
||
memory_manager = BlenderMemoryManager()
|
||
|
||
|
||
@contextmanager
|
||
def safe_blender_operation(operation_name: str):
|
||
"""
|
||
安全的 Blender 操作上下文管理器
|
||
|
||
使用方法:
|
||
with safe_blender_operation("创建网格"):
|
||
# 执行 Blender 操作
|
||
pass
|
||
"""
|
||
logger.info(f"开始操作: {operation_name}")
|
||
start_time = time.time()
|
||
|
||
# 记录初始状态
|
||
initial_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
|
||
initial_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
|
||
|
||
try:
|
||
yield
|
||
|
||
except Exception as e:
|
||
logger.error(f"操作失败 {operation_name}: {e}")
|
||
|
||
# 尝试清理
|
||
try:
|
||
if memory_manager.should_cleanup():
|
||
memory_manager.cleanup_orphaned_data()
|
||
gc.collect()
|
||
except:
|
||
pass
|
||
|
||
raise
|
||
|
||
finally:
|
||
# 记录最终状态
|
||
final_objects = len(bpy.data.objects) if BLENDER_AVAILABLE else 0
|
||
final_meshes = len(bpy.data.meshes) if BLENDER_AVAILABLE else 0
|
||
|
||
elapsed_time = time.time() - start_time
|
||
|
||
logger.info(f"操作完成: {operation_name}")
|
||
logger.info(f"耗时: {elapsed_time:.2f}秒")
|
||
logger.info(f"对象变化: {final_objects - initial_objects}")
|
||
logger.info(f"网格变化: {final_meshes - initial_meshes}")
|
||
|
||
# ==================== 几何类扩展 ====================
|
||
|
||
|
||
class Point3d:
|
||
"""3D点类 - 对应Ruby的Geom::Point3d"""
|
||
|
||
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
|
||
self.x = x
|
||
self.y = y
|
||
self.z = z
|
||
|
||
@classmethod
|
||
def parse(cls, value: str):
|
||
"""从字符串解析3D点"""
|
||
if not value or value.strip() == "":
|
||
return None
|
||
|
||
# 解析格式: "(x,y,z)" 或 "x,y,z"
|
||
clean_value = re.sub(r'[()]*', '', value)
|
||
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
|
||
|
||
# 转换mm为内部单位(假设输入是mm)
|
||
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
|
||
|
||
def to_s(self, unit: str = "mm", digits: int = -1) -> str:
|
||
"""转换为字符串"""
|
||
if unit == "cm":
|
||
x_val = self.x * 100 # 内部单位转换为cm
|
||
y_val = self.y * 100
|
||
z_val = self.z * 100
|
||
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
|
||
else: # mm
|
||
x_val = self.x * 1000 # 内部单位转换为mm
|
||
y_val = self.y * 1000
|
||
z_val = self.z * 1000
|
||
|
||
if digits == -1:
|
||
return f"({x_val}, {y_val}, {z_val})"
|
||
else:
|
||
return f"({x_val:.{digits}f}, {y_val:.{digits}f}, {z_val:.{digits}f})"
|
||
|
||
def __str__(self):
|
||
return self.to_s()
|
||
|
||
def __repr__(self):
|
||
return f"Point3d({self.x}, {self.y}, {self.z})"
|
||
|
||
|
||
class Vector3d:
|
||
"""3D向量类 - 对应Ruby的Geom::Vector3d"""
|
||
|
||
def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0):
|
||
self.x = x
|
||
self.y = y
|
||
self.z = z
|
||
|
||
@classmethod
|
||
def parse(cls, value: str):
|
||
"""从字符串解析3D向量"""
|
||
if not value or value.strip() == "":
|
||
return None
|
||
|
||
clean_value = re.sub(r'[()]*', '', value)
|
||
xyz = [float(axis.strip()) for axis in clean_value.split(',')]
|
||
|
||
return cls(xyz[0] * 0.001, xyz[1] * 0.001, xyz[2] * 0.001)
|
||
|
||
def to_s(self, unit: str = "mm") -> str:
|
||
"""转换为字符串"""
|
||
if unit == "cm":
|
||
x_val = self.x * 100 # 内部单位转换为cm
|
||
y_val = self.y * 100
|
||
z_val = self.z * 100
|
||
return f"({x_val:.3f}, {y_val:.3f}, {z_val:.3f})"
|
||
elif unit == "in":
|
||
return f"({self.x}, {self.y}, {self.z})"
|
||
else: # mm
|
||
x_val = self.x * 1000 # 内部单位转换为mm
|
||
y_val = self.y * 1000
|
||
z_val = self.z * 1000
|
||
return f"({x_val}, {y_val}, {z_val})"
|
||
|
||
def normalize(self):
|
||
"""归一化向量"""
|
||
length = math.sqrt(self.x**2 + self.y**2 + self.z**2)
|
||
if length > 0:
|
||
return Vector3d(self.x/length, self.y/length, self.z/length)
|
||
return Vector3d(0, 0, 0)
|
||
|
||
def __str__(self):
|
||
return self.to_s()
|
||
|
||
|
||
class Transformation:
|
||
"""变换矩阵类 - 对应Ruby的Geom::Transformation"""
|
||
|
||
def __init__(self, origin: Point3d = None, x_axis: Vector3d = None,
|
||
y_axis: Vector3d = None, z_axis: Vector3d = None):
|
||
self.origin = origin or Point3d(0, 0, 0)
|
||
self.x_axis = x_axis or Vector3d(1, 0, 0)
|
||
self.y_axis = y_axis or Vector3d(0, 1, 0)
|
||
self.z_axis = z_axis or Vector3d(0, 0, 1)
|
||
|
||
@classmethod
|
||
def parse(cls, data: Dict[str, str]):
|
||
"""从字典解析变换"""
|
||
origin = Point3d.parse(data.get("o"))
|
||
x_axis = Vector3d.parse(data.get("x"))
|
||
y_axis = Vector3d.parse(data.get("y"))
|
||
z_axis = Vector3d.parse(data.get("z"))
|
||
|
||
return cls(origin, x_axis, y_axis, z_axis)
|
||
|
||
def store(self, data: Dict[str, str]):
|
||
"""存储变换到字典"""
|
||
data["o"] = self.origin.to_s("mm")
|
||
data["x"] = self.x_axis.to_s("in")
|
||
data["y"] = self.y_axis.to_s("in")
|
||
data["z"] = self.z_axis.to_s("in")
|
||
|
||
# ==================== SUWood 材质类型常量 ====================
|
||
|
||
|
||
MAT_TYPE_NORMAL = 0
|
||
MAT_TYPE_OBVERSE = 1
|
||
MAT_TYPE_NATURE = 2
|
||
|
||
# ==================== SUWImpl 核心实现类 ====================
|
||
|
||
|
||
class SUWImpl:
|
||
"""SUWood核心实现类 - 完整翻译版本,应用内存管理最佳实践"""
|
||
|
||
_instance = None
|
||
_selected_uid = None
|
||
_selected_obj = None
|
||
_selected_zone = None
|
||
_selected_part = None
|
||
_scaled_zone = None
|
||
_server_path = None
|
||
_default_zone = None
|
||
_creation_lock = False
|
||
_mesh_creation_count = 0
|
||
_batch_operation_active = False
|
||
|
||
def __init__(self):
|
||
"""初始化SUWImpl实例"""
|
||
# 基础属性
|
||
self.parts = {}
|
||
self.zones = {}
|
||
self.hardwares = {}
|
||
self.materials = {}
|
||
self.textures = {}
|
||
|
||
# 内存管理相关
|
||
self.object_references = {} # 存储对象名称而非引用
|
||
self.mesh_cache = {}
|
||
self.material_cache = {}
|
||
|
||
# 批量操作优化
|
||
self.deferred_updates = []
|
||
self.batch_size = 50
|
||
|
||
logger.info("SUWImpl 初始化完成,启用内存管理优化")
|
||
|
||
@classmethod
|
||
def get_instance(cls):
|
||
"""获取单例实例"""
|
||
if cls._instance is None:
|
||
cls._instance = cls()
|
||
return cls._instance
|
||
|
||
def startup(self):
|
||
"""启动SUWood系统"""
|
||
logger.info("启动SUWood系统...")
|
||
|
||
if BLENDER_AVAILABLE:
|
||
try:
|
||
self._create_layers()
|
||
self._init_materials()
|
||
self._init_default_zone()
|
||
logger.info("✅ SUWood系统启动成功")
|
||
except Exception as e:
|
||
logger.error(f"❌ SUWood系统启动失败: {e}")
|
||
raise
|
||
else:
|
||
logger.warning("⚠️ Blender不可用,使用基础模式")
|
||
|
||
def _create_layers(self):
|
||
"""创建图层"""
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
try:
|
||
# 创建必要的集合
|
||
collections = ["SUWood_Parts", "SUWood_Hardware", "SUWood_Zones"]
|
||
|
||
for collection_name in collections:
|
||
if collection_name not in bpy.data.collections:
|
||
collection = bpy.data.collections.new(collection_name)
|
||
bpy.context.scene.collection.children.link(collection)
|
||
logger.info(f"创建集合: {collection_name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建图层失败: {e}")
|
||
|
||
def _init_materials(self):
|
||
"""初始化材质"""
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
try:
|
||
# 创建默认材质
|
||
default_materials = [
|
||
("SUWood_Default", (0.8, 0.8, 0.8, 1.0)),
|
||
("SUWood_Wood", (0.6, 0.4, 0.2, 1.0)),
|
||
("SUWood_Metal", (0.7, 0.7, 0.8, 1.0)),
|
||
]
|
||
|
||
for mat_name, color in default_materials:
|
||
if mat_name not in bpy.data.materials:
|
||
material = bpy.data.materials.new(mat_name)
|
||
material.use_nodes = True
|
||
|
||
if material.node_tree:
|
||
principled = material.node_tree.nodes.get(
|
||
"Principled BSDF")
|
||
if principled:
|
||
principled.inputs[0].default_value = color
|
||
|
||
self.material_cache[mat_name] = material.name # 存储名称而非引用
|
||
memory_manager.register_object(material)
|
||
logger.info(f"创建材质: {mat_name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"初始化材质失败: {e}")
|
||
|
||
def _init_default_zone(self):
|
||
"""初始化默认区域"""
|
||
try:
|
||
self._default_zone = {
|
||
"name": "Default Zone",
|
||
"id": "default_zone_001",
|
||
"visible": True,
|
||
"parts": {},
|
||
"hardwares": {}
|
||
}
|
||
|
||
self.zones["default"] = self._default_zone
|
||
logger.info("初始化默认区域")
|
||
|
||
except Exception as e:
|
||
logger.error(f"初始化默认区域失败: {e}")
|
||
|
||
# ==================== 内存管理优化的核心方法 ====================
|
||
|
||
def validate_mesh_data(self, vertices: List, faces: List) -> bool:
|
||
"""验证网格数据有效性"""
|
||
try:
|
||
# 检查顶点数量
|
||
if len(vertices) < 3:
|
||
logger.warning("顶点数量不足")
|
||
return False
|
||
|
||
# 检查面数据
|
||
for face in faces:
|
||
if len(face) < 3:
|
||
logger.warning("面顶点数量不足")
|
||
return False
|
||
|
||
# 检查顶点索引有效性
|
||
for vertex_index in face:
|
||
if vertex_index >= len(vertices) or vertex_index < 0:
|
||
logger.warning(f"无效的顶点索引: {vertex_index}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"网格数据验证失败: {e}")
|
||
return False
|
||
|
||
def validate_coplanar_points(self, points: List[Tuple[float, float, float]]) -> bool:
|
||
"""验证点是否共面"""
|
||
if len(points) < 4:
|
||
return True # 3个点总是共面的
|
||
|
||
try:
|
||
# 使用前三个点定义平面
|
||
p1 = mathutils.Vector(points[0])
|
||
p2 = mathutils.Vector(points[1])
|
||
p3 = mathutils.Vector(points[2])
|
||
|
||
# 计算平面法向量
|
||
normal = (p2 - p1).cross(p3 - p1).normalized()
|
||
|
||
# 检查其他点是否在同一平面上
|
||
tolerance = 1e-6
|
||
for point in points[3:]:
|
||
p = mathutils.Vector(point)
|
||
distance = abs((p - p1).dot(normal))
|
||
if distance > tolerance:
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"共面检查失败: {e}")
|
||
return False
|
||
|
||
@contextmanager
|
||
def atomic_mesh_creation(self, name: str):
|
||
"""原子网格创建上下文管理器"""
|
||
mesh = None
|
||
obj = None
|
||
|
||
try:
|
||
# 第1步:创建网格数据
|
||
mesh = bpy.data.meshes.new(name)
|
||
memory_manager.register_mesh(mesh)
|
||
|
||
yield mesh
|
||
|
||
except Exception as e:
|
||
logger.error(f"原子网格创建失败: {e}")
|
||
# 自动清理
|
||
if obj:
|
||
self._cleanup_object_safe(obj)
|
||
if mesh:
|
||
self._cleanup_mesh_safe(mesh)
|
||
raise
|
||
|
||
def _cleanup_mesh_safe(self, mesh):
|
||
"""安全清理网格"""
|
||
try:
|
||
if mesh and mesh.name in bpy.data.meshes:
|
||
bpy.data.meshes.remove(mesh, do_unlink=True)
|
||
memory_manager.creation_stats['cleanup_calls'] += 1
|
||
logger.info(f"清理网格: {mesh.name}")
|
||
except Exception as e:
|
||
logger.error(f"网格清理失败: {e}")
|
||
memory_manager.creation_stats['failed_cleanups'] += 1
|
||
|
||
def _cleanup_object_safe(self, obj):
|
||
"""安全清理对象"""
|
||
try:
|
||
if obj and obj.name in bpy.data.objects:
|
||
# 从所有集合中移除
|
||
for collection in obj.users_collection:
|
||
collection.objects.unlink(obj)
|
||
# 删除对象
|
||
bpy.data.objects.remove(obj, do_unlink=True)
|
||
memory_manager.creation_stats['cleanup_calls'] += 1
|
||
logger.info(f"清理对象: {obj.name}")
|
||
except Exception as e:
|
||
logger.error(f"对象清理失败: {e}")
|
||
memory_manager.creation_stats['failed_cleanups'] += 1
|
||
|
||
def _create_single_mesh_atomic(self, container: Any, surface: Dict[str, Any],
|
||
color: str = None, scale: float = None, angle: float = None,
|
||
series: List = None, uid: str = None, zid: Any = None,
|
||
pid: Any = None, child: Any = None, options: Dict = None) -> Any:
|
||
"""优化的原子性mesh创建"""
|
||
|
||
with safe_blender_operation(f"创建网格 uid={uid}"):
|
||
try:
|
||
# 增加创建计数
|
||
self.__class__._mesh_creation_count += 1
|
||
creation_count = self.__class__._mesh_creation_count
|
||
|
||
# 定期清理
|
||
if creation_count % 100 == 0:
|
||
logger.info(f"执行定期清理,当前创建数: {creation_count}")
|
||
memory_manager.cleanup_orphaned_data()
|
||
gc.collect()
|
||
|
||
# 数据验证
|
||
segs = surface.get("segs", [])
|
||
points = []
|
||
for seg in segs:
|
||
if isinstance(seg, list) and len(seg) >= 1:
|
||
point_str = seg[0] if isinstance(
|
||
seg[0], str) else str(seg[0])
|
||
try:
|
||
point = Point3d.parse(point_str)
|
||
if point:
|
||
points.append(point)
|
||
except Exception as e:
|
||
logger.warning(f"点解析失败: {point_str}")
|
||
|
||
if len(points) < 3:
|
||
logger.warning(f"有效点数不足: {len(points)}")
|
||
return None
|
||
|
||
# 面积验证
|
||
area = self._calculate_face_area(points)
|
||
if area < 1e-6:
|
||
logger.warning(f"面积过小: {area}")
|
||
return None
|
||
|
||
# 验证几何数据
|
||
vertices = [(float(p.x), float(p.y), float(p.z))
|
||
for p in points]
|
||
faces = [list(range(len(vertices)))] if len(
|
||
vertices) >= 3 else []
|
||
|
||
if not self.validate_mesh_data(vertices, faces):
|
||
logger.warning("网格数据验证失败")
|
||
return None
|
||
|
||
# 原子性创建
|
||
timestamp = hash(
|
||
str(points[0]) + str(uid) + str(child) + str(creation_count))
|
||
mesh_name = f"SUWMesh_{abs(timestamp) % 10000}"
|
||
|
||
with self.atomic_mesh_creation(mesh_name) as mesh:
|
||
# 设置几何数据
|
||
mesh.from_pydata(vertices, [], faces)
|
||
|
||
# 创建UV坐标
|
||
if len(vertices) >= 3:
|
||
try:
|
||
uv_layer = mesh.uv_layers.new(name="UVMap")
|
||
self._generate_uv_coordinates(mesh, uv_layer)
|
||
except Exception as e:
|
||
logger.warning(f"UV坐标创建失败: {e}")
|
||
|
||
# 安全更新网格
|
||
if not self._safe_mesh_update(mesh):
|
||
raise RuntimeError("网格更新失败")
|
||
|
||
# 创建对象
|
||
obj_name = f"SUWObj_{abs(timestamp) % 10000}"
|
||
obj = bpy.data.objects.new(obj_name, mesh)
|
||
memory_manager.register_object(obj)
|
||
|
||
# 添加到集合
|
||
if hasattr(container, 'objects'):
|
||
container.objects.link(obj)
|
||
else:
|
||
bpy.context.scene.collection.objects.link(obj)
|
||
|
||
# 设置属性
|
||
self._set_object_attributes_safe(
|
||
obj, uid, zid, pid, child, area, creation_count)
|
||
|
||
# 应用材质
|
||
if color:
|
||
self._apply_material_safe(obj, color)
|
||
|
||
# 存储对象引用(名称而非对象)
|
||
self.object_references[obj_name] = {
|
||
'uid': uid,
|
||
'type': 'mesh',
|
||
'creation_time': time.time()
|
||
}
|
||
|
||
logger.info(f"✅ 网格创建成功: {obj.name}, {len(vertices)}顶点")
|
||
return obj
|
||
|
||
except Exception as e:
|
||
logger.error(f"网格创建失败: {e}")
|
||
return None
|
||
|
||
def _generate_uv_coordinates(self, mesh, uv_layer):
|
||
"""生成UV坐标"""
|
||
try:
|
||
for poly in mesh.polygons:
|
||
for i, loop_index in enumerate(poly.loop_indices):
|
||
vertex_count = len(poly.vertices)
|
||
if vertex_count == 4: # 四边形
|
||
uv_coords = [(0, 0), (1, 0), (1, 1), (0, 1)]
|
||
uv_layer.data[loop_index].uv = uv_coords[i % 4]
|
||
elif vertex_count == 3: # 三角形
|
||
uv_coords = [(0, 0), (1, 0), (0.5, 1)]
|
||
uv_layer.data[loop_index].uv = uv_coords[i % 3]
|
||
else: # 其他多边形
|
||
angle = (i / vertex_count) * 2 * math.pi
|
||
u = (math.cos(angle) + 1) * 0.5
|
||
v = (math.sin(angle) + 1) * 0.5
|
||
uv_layer.data[loop_index].uv = (u, v)
|
||
except Exception as e:
|
||
logger.error(f"UV坐标生成失败: {e}")
|
||
|
||
def _safe_mesh_update(self, mesh, max_retries=3):
|
||
"""安全更新网格"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# 检查网格是否仍然有效
|
||
if not mesh or mesh.name not in bpy.data.meshes:
|
||
return False
|
||
|
||
# 更新网格
|
||
mesh.update()
|
||
|
||
# 验证更新结果
|
||
if not mesh.vertices:
|
||
raise RuntimeError("网格更新后无顶点")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.warning(f"网格更新失败,尝试 {attempt + 1}/{max_retries}: {e}")
|
||
|
||
if attempt < max_retries - 1:
|
||
time.sleep(0.1) # 短暂等待
|
||
else:
|
||
return False
|
||
|
||
return False
|
||
|
||
def _set_object_attributes_safe(self, obj, uid, zid, pid, child, area, creation_count):
|
||
"""安全设置对象属性"""
|
||
try:
|
||
obj["uid"] = str(uid) if uid else "unknown"
|
||
obj["zid"] = str(zid) if zid else "unknown"
|
||
obj["pid"] = str(pid) if pid else "unknown"
|
||
obj["child"] = str(child) if child else "unknown"
|
||
obj["area"] = float(area) if area else 0.0
|
||
obj["creation_index"] = creation_count
|
||
obj["creation_time"] = time.time()
|
||
obj["memory_managed"] = True
|
||
except Exception as e:
|
||
logger.error(f"设置对象属性失败: {e}")
|
||
|
||
def _apply_material_safe(self, obj, color):
|
||
"""安全应用材质"""
|
||
try:
|
||
material = self.get_texture(color)
|
||
if material:
|
||
if obj.data.materials:
|
||
obj.data.materials[0] = material
|
||
else:
|
||
obj.data.materials.append(material)
|
||
logger.info(f"材质应用成功: {color}")
|
||
else:
|
||
self._apply_simple_material(obj, color)
|
||
except Exception as e:
|
||
logger.error(f"材质应用失败: {e}")
|
||
try:
|
||
self._apply_simple_material(obj, color)
|
||
except:
|
||
pass
|
||
|
||
def _apply_simple_material(self, obj: Any, color: str):
|
||
"""应用简单材质"""
|
||
try:
|
||
# 创建或获取缓存的材质
|
||
mat_name = f"SUWMat_{color}" if color else "SUWMat_default"
|
||
|
||
if mat_name in self.material_cache:
|
||
material_name = self.material_cache[mat_name]
|
||
if material_name in bpy.data.materials:
|
||
material = bpy.data.materials[material_name]
|
||
obj.data.materials.append(material)
|
||
return
|
||
|
||
# 创建新材质
|
||
material = bpy.data.materials.new(mat_name)
|
||
material.use_nodes = True
|
||
|
||
if material.node_tree and material.node_tree.nodes:
|
||
principled = material.node_tree.nodes.get("Principled BSDF")
|
||
if principled:
|
||
if color and len(color) >= 6:
|
||
try:
|
||
# 解析颜色
|
||
r = int(color[0:2], 16) / 255.0
|
||
g = int(color[2:4], 16) / 255.0
|
||
b = int(color[4:6], 16) / 255.0
|
||
principled.inputs[0].default_value = (r, g, b, 1.0)
|
||
except:
|
||
principled.inputs[0].default_value = (
|
||
0.8, 0.8, 0.8, 1.0)
|
||
else:
|
||
principled.inputs[0].default_value = (
|
||
0.8, 0.8, 0.8, 1.0)
|
||
|
||
# 缓存材质
|
||
self.material_cache[mat_name] = material.name
|
||
memory_manager.register_object(material)
|
||
|
||
# 应用材质
|
||
obj.data.materials.append(material)
|
||
logger.info(f"简单材质创建并应用: {mat_name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"简单材质应用失败: {e}")
|
||
|
||
# ==================== 优化的删除和清理方法 ====================
|
||
|
||
def _force_delete_objects_optimized(self, objects_to_delete: List[Any]):
|
||
"""优化的强制删除对象"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
logger.info(f"开始优化删除 {len(objects_to_delete)} 个对象")
|
||
|
||
# 预处理:收集对象名称
|
||
object_names = []
|
||
for obj in objects_to_delete:
|
||
if obj and hasattr(obj, 'name'):
|
||
object_names.append(obj.name)
|
||
|
||
# 批量删除
|
||
deleted_count = 0
|
||
for obj_name in object_names:
|
||
try:
|
||
if obj_name in bpy.data.objects:
|
||
obj = bpy.data.objects[obj_name]
|
||
|
||
# 从集合中移除
|
||
collections_to_unlink = list(obj.users_collection)
|
||
for collection in collections_to_unlink:
|
||
try:
|
||
collection.objects.unlink(obj)
|
||
except:
|
||
pass
|
||
|
||
# 删除对象
|
||
bpy.data.objects.remove(obj, do_unlink=True)
|
||
deleted_count += 1
|
||
|
||
# 从引用中移除
|
||
if obj_name in self.object_references:
|
||
del self.object_references[obj_name]
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除对象失败 {obj_name}: {e}")
|
||
|
||
# 更新依赖图
|
||
if deleted_count > 0:
|
||
self._update_dependency_graph(full_update=True)
|
||
logger.info(f"✅ 优化删除完成: {deleted_count} 个对象")
|
||
|
||
except Exception as e:
|
||
logger.error(f"优化删除失败: {e}")
|
||
|
||
def _update_dependency_graph(self, full_update: bool = False):
|
||
"""更新Blender依赖图"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
if full_update:
|
||
logger.info("执行全局依赖图更新")
|
||
bpy.context.view_layer.update()
|
||
bpy.context.evaluated_depsgraph_get().update()
|
||
|
||
# 刷新视图
|
||
try:
|
||
for area in bpy.context.screen.areas:
|
||
if area.type in ['VIEW_3D', 'OUTLINER']:
|
||
area.tag_redraw()
|
||
except:
|
||
pass
|
||
|
||
logger.info("全局依赖图更新完成")
|
||
else:
|
||
# 快速更新
|
||
bpy.context.view_layer.update()
|
||
|
||
except Exception as e:
|
||
logger.error(f"依赖图更新失败: {e}")
|
||
|
||
# ==================== 保持原有业务逻辑的方法 ====================
|
||
|
||
def get_zones(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""获取区域信息"""
|
||
return self.zones
|
||
|
||
def get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""获取零件信息"""
|
||
return self.parts
|
||
|
||
def get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""获取硬件信息"""
|
||
return self.hardwares
|
||
|
||
def get_texture(self, key: str):
|
||
"""获取纹理材质"""
|
||
if not BLENDER_AVAILABLE:
|
||
return None
|
||
|
||
try:
|
||
# 从缓存中获取
|
||
if key in self.material_cache:
|
||
material_name = self.material_cache[key]
|
||
if material_name in bpy.data.materials:
|
||
return bpy.data.materials[material_name]
|
||
|
||
# 查找现有材质
|
||
for material in bpy.data.materials:
|
||
if key in material.name:
|
||
self.material_cache[key] = material.name
|
||
return material
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取纹理失败: {e}")
|
||
return None
|
||
|
||
def create_face(self, container: Any, surface: Dict[str, Any], color: str = None,
|
||
scale: float = None, angle: float = None, series: List = None,
|
||
uid: str = None, zid: Any = None, pid: Any = None,
|
||
child: Any = None) -> Any:
|
||
"""创建面 - 使用优化的内存管理"""
|
||
try:
|
||
return self._create_single_mesh_atomic(
|
||
container, surface, color, scale, angle,
|
||
series, uid, zid, pid, child
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"创建面失败: {e}")
|
||
return None
|
||
|
||
def _calculate_face_area(self, points: List[Point3d]) -> float:
|
||
"""计算面积"""
|
||
try:
|
||
if len(points) < 3:
|
||
return 0.0
|
||
|
||
# 使用三角剖分计算面积
|
||
area = 0.0
|
||
for i in range(1, len(points) - 1):
|
||
p1 = points[0]
|
||
p2 = points[i]
|
||
p3 = points[i + 1]
|
||
|
||
# 计算三角形面积
|
||
v1 = Vector3d(p2.x - p1.x, p2.y - p1.y, p2.z - p1.z)
|
||
v2 = Vector3d(p3.x - p1.x, p3.y - p1.y, p3.z - p1.z)
|
||
|
||
# 叉积的模长的一半就是三角形面积
|
||
cross_x = v1.y * v2.z - v1.z * v2.y
|
||
cross_y = v1.z * v2.x - v1.x * v2.z
|
||
cross_z = v1.x * v2.y - v1.y * v2.x
|
||
|
||
triangle_area = 0.5 * \
|
||
math.sqrt(cross_x**2 + cross_y**2 + cross_z**2)
|
||
area += triangle_area
|
||
|
||
return area
|
||
|
||
except Exception as e:
|
||
logger.error(f"计算面积失败: {e}")
|
||
return 0.0
|
||
|
||
# ==================== 系统管理方法 ====================
|
||
|
||
def get_creation_stats(self) -> Dict[str, Any]:
|
||
"""获取创建统计信息"""
|
||
try:
|
||
stats = {
|
||
"creation_count": self.__class__._mesh_creation_count,
|
||
"object_references": len(self.object_references),
|
||
"mesh_cache_size": len(self.mesh_cache),
|
||
"material_cache_size": len(self.material_cache),
|
||
"memory_manager_stats": memory_manager.creation_stats.copy(),
|
||
"blender_available": BLENDER_AVAILABLE
|
||
}
|
||
|
||
if BLENDER_AVAILABLE:
|
||
stats["total_objects"] = len(bpy.data.objects)
|
||
stats["total_meshes"] = len(bpy.data.meshes)
|
||
stats["total_materials"] = len(bpy.data.materials)
|
||
|
||
return stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取统计信息失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def force_cleanup(self):
|
||
"""强制清理"""
|
||
try:
|
||
logger.info("开始强制清理")
|
||
|
||
# 清理孤立数据
|
||
cleanup_count = memory_manager.cleanup_orphaned_data()
|
||
|
||
# 清理缓存
|
||
self.mesh_cache.clear()
|
||
|
||
# 清理过期的对象引用
|
||
current_time = time.time()
|
||
expired_refs = []
|
||
for obj_name, ref_info in self.object_references.items():
|
||
if current_time - ref_info.get('creation_time', 0) > 3600: # 1小时
|
||
expired_refs.append(obj_name)
|
||
|
||
for obj_name in expired_refs:
|
||
del self.object_references[obj_name]
|
||
|
||
# 强制垃圾回收
|
||
gc.collect()
|
||
|
||
# 更新依赖图
|
||
if BLENDER_AVAILABLE:
|
||
self._update_dependency_graph(full_update=True)
|
||
|
||
logger.info(
|
||
f"强制清理完成: 清理了 {cleanup_count} 个数据块,{len(expired_refs)} 个过期引用")
|
||
|
||
except Exception as e:
|
||
logger.error(f"强制清理失败: {e}")
|
||
|
||
def diagnose_system_state(self):
|
||
"""诊断系统状态"""
|
||
try:
|
||
logger.info("=== 系统状态诊断 ===")
|
||
|
||
# 内存使用情况
|
||
stats = self.get_creation_stats()
|
||
for key, value in stats.items():
|
||
logger.info(f"{key}: {value}")
|
||
|
||
# 检查潜在问题
|
||
issues = []
|
||
|
||
if BLENDER_AVAILABLE:
|
||
# 检查孤立数据
|
||
orphaned_meshes = [m for m in bpy.data.meshes if m.users == 0]
|
||
if orphaned_meshes:
|
||
issues.append(f"发现 {len(orphaned_meshes)} 个孤立网格")
|
||
|
||
# 检查空网格
|
||
empty_meshes = [m for m in bpy.data.meshes if not m.vertices]
|
||
if empty_meshes:
|
||
issues.append(f"发现 {len(empty_meshes)} 个空网格")
|
||
|
||
# 检查总顶点数
|
||
total_vertices = sum(len(m.vertices) for m in bpy.data.meshes)
|
||
if total_vertices > 1000000: # 100万顶点
|
||
issues.append(f"顶点数量过多: {total_vertices}")
|
||
|
||
if issues:
|
||
logger.warning("发现问题:")
|
||
for issue in issues:
|
||
logger.warning(f" - {issue}")
|
||
else:
|
||
logger.info("✅ 系统状态正常")
|
||
|
||
return issues
|
||
|
||
except Exception as e:
|
||
logger.error(f"系统诊断失败: {e}")
|
||
return [f"诊断失败: {e}"]
|
||
|
||
# ==================== 其他必要的方法存根 ====================
|
||
|
||
def c09(self, data: Dict[str, Any]):
|
||
"""删除操作 - 使用优化的内存管理"""
|
||
try:
|
||
uid = data.get("uid")
|
||
typ = data.get("typ", "uid")
|
||
oid = data.get("oid")
|
||
|
||
logger.info(f"执行删除操作: uid={uid}, typ={typ}, oid={oid}")
|
||
|
||
# 查找要删除的对象
|
||
objects_to_delete = self._find_objects_for_deletion(uid, typ, oid)
|
||
|
||
if objects_to_delete:
|
||
# 使用优化的删除方法
|
||
self._force_delete_objects_optimized(objects_to_delete)
|
||
|
||
# 清理数据
|
||
self._safe_data_cleanup(uid, typ, oid)
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除操作失败: {e}")
|
||
|
||
def _find_objects_for_deletion(self, uid: str, typ: str, oid: Any) -> List[Any]:
|
||
"""查找要删除的对象"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return []
|
||
|
||
objects_to_delete = []
|
||
|
||
# 根据对象引用查找
|
||
for obj_name, ref_info in self.object_references.items():
|
||
if ref_info.get('uid') == uid:
|
||
if obj_name in bpy.data.objects:
|
||
objects_to_delete.append(bpy.data.objects[obj_name])
|
||
|
||
# 根据对象属性查找
|
||
for obj in bpy.data.objects:
|
||
if obj.get("uid") == uid:
|
||
objects_to_delete.append(obj)
|
||
|
||
logger.info(f"找到 {len(objects_to_delete)} 个要删除的对象")
|
||
return objects_to_delete
|
||
|
||
except Exception as e:
|
||
logger.error(f"查找删除对象失败: {e}")
|
||
return []
|
||
|
||
def _safe_data_cleanup(self, uid: str, typ: str, oid: Any):
|
||
"""安全的数据清理"""
|
||
try:
|
||
cleanup_count = 0
|
||
|
||
# 清理内部数据结构
|
||
data_fields = [self.parts, self.zones, self.hardwares]
|
||
for data_dict in data_fields:
|
||
if uid in data_dict:
|
||
del data_dict[uid]
|
||
cleanup_count += 1
|
||
|
||
# 清理对象引用
|
||
refs_to_remove = []
|
||
for obj_name, ref_info in self.object_references.items():
|
||
if ref_info.get('uid') == uid:
|
||
refs_to_remove.append(obj_name)
|
||
|
||
for obj_name in refs_to_remove:
|
||
del self.object_references[obj_name]
|
||
cleanup_count += 1
|
||
|
||
logger.info(f"数据清理完成: {cleanup_count} 项")
|
||
|
||
except Exception as e:
|
||
logger.error(f"数据清理失败: {e}")
|
||
|
||
# ==================== 其他业务逻辑方法的存根 ====================
|
||
|
||
def c00(self, data: Dict[str, Any]):
|
||
"""业务方法存根"""
|
||
pass
|
||
|
||
def c01(self, data: Dict[str, Any]):
|
||
"""创建区域"""
|
||
try:
|
||
uid = data.get("uid")
|
||
zone_data = data.get("zone", {})
|
||
|
||
with safe_blender_operation(f"创建区域 uid={uid}"):
|
||
# 创建区域集合
|
||
collection_name = f"Zone_{uid}"
|
||
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
|
||
collection = bpy.data.collections.new(collection_name)
|
||
bpy.context.scene.collection.children.link(collection)
|
||
memory_manager.register_object(collection)
|
||
|
||
# 存储区域数据
|
||
self.zones[uid] = {
|
||
"collection_name": collection_name,
|
||
"data": zone_data,
|
||
"parts": {},
|
||
"creation_time": time.time()
|
||
}
|
||
|
||
logger.info(f"创建区域成功: {uid}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建区域失败: {e}")
|
||
|
||
def c02(self, data: Dict[str, Any]):
|
||
"""创建零件"""
|
||
try:
|
||
uid = data.get("uid")
|
||
parts_data = data.get("parts", {})
|
||
|
||
with safe_blender_operation(f"创建零件 uid={uid}"):
|
||
# 批量创建零件
|
||
created_parts = {}
|
||
|
||
for part_id, part_data in parts_data.items():
|
||
try:
|
||
# 创建零件集合
|
||
collection_name = f"Part_{uid}_{part_id}"
|
||
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
|
||
collection = bpy.data.collections.new(
|
||
collection_name)
|
||
|
||
# 添加到对应的区域集合
|
||
if uid in self.zones:
|
||
zone_collection_name = self.zones[uid]["collection_name"]
|
||
if zone_collection_name in bpy.data.collections:
|
||
parent_collection = bpy.data.collections[zone_collection_name]
|
||
parent_collection.children.link(collection)
|
||
else:
|
||
bpy.context.scene.collection.children.link(
|
||
collection)
|
||
|
||
memory_manager.register_object(collection)
|
||
|
||
created_parts[part_id] = {
|
||
"collection_name": collection_name,
|
||
"data": part_data,
|
||
"creation_time": time.time()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建零件失败 {part_id}: {e}")
|
||
|
||
# 存储零件数据
|
||
self.parts[uid] = created_parts
|
||
logger.info(f"创建零件成功: {uid}, {len(created_parts)} 个零件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建零件操作失败: {e}")
|
||
|
||
def c03(self, data: Dict[str, Any]):
|
||
"""创建硬件"""
|
||
try:
|
||
uid = data.get("uid")
|
||
hardware_data = data.get("hardwares", {})
|
||
|
||
with safe_blender_operation(f"创建硬件 uid={uid}"):
|
||
created_hardware = {}
|
||
|
||
for hw_id, hw_data in hardware_data.items():
|
||
try:
|
||
# 创建硬件集合
|
||
collection_name = f"Hardware_{uid}_{hw_id}"
|
||
if BLENDER_AVAILABLE and collection_name not in bpy.data.collections:
|
||
collection = bpy.data.collections.new(
|
||
collection_name)
|
||
bpy.context.scene.collection.children.link(
|
||
collection)
|
||
memory_manager.register_object(collection)
|
||
|
||
created_hardware[hw_id] = {
|
||
"collection_name": collection_name,
|
||
"data": hw_data,
|
||
"creation_time": time.time()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建硬件失败 {hw_id}: {e}")
|
||
|
||
# 存储硬件数据
|
||
self.hardwares[uid] = created_hardware
|
||
logger.info(f"创建硬件成功: {uid}, {len(created_hardware)} 个硬件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建硬件操作失败: {e}")
|
||
|
||
def c04(self, data: Dict[str, Any]):
|
||
"""创建面和边 - 优化版本"""
|
||
try:
|
||
uid = data.get("uid")
|
||
parts_data = data.get("parts", {})
|
||
|
||
with safe_blender_operation(f"创建面和边 uid={uid}"):
|
||
# 批量创建优化
|
||
total_created = 0
|
||
|
||
for part_id, part_data in parts_data.items():
|
||
try:
|
||
# 获取零件集合
|
||
collection = None
|
||
if BLENDER_AVAILABLE:
|
||
collection_name = f"Part_{uid}_{part_id}"
|
||
if collection_name in bpy.data.collections:
|
||
collection = bpy.data.collections[collection_name]
|
||
|
||
# 创建面
|
||
surfaces = part_data.get("surfaces", [])
|
||
for surface in surfaces:
|
||
try:
|
||
obj = self.create_face(
|
||
collection or bpy.context.scene.collection,
|
||
surface,
|
||
surface.get("color"),
|
||
surface.get("scale"),
|
||
surface.get("angle"),
|
||
uid=uid,
|
||
pid=part_id
|
||
)
|
||
if obj:
|
||
total_created += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建面失败: {e}")
|
||
|
||
# 创建边
|
||
edges = part_data.get("edges", [])
|
||
for edge in edges:
|
||
try:
|
||
self.create_edges(
|
||
collection or bpy.context.scene.collection,
|
||
[edge]
|
||
)
|
||
total_created += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建边失败: {e}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理零件失败 {part_id}: {e}")
|
||
|
||
logger.info(f"创建面和边完成: {total_created} 个对象")
|
||
|
||
# 定期清理
|
||
if total_created > 50:
|
||
memory_manager.cleanup_orphaned_data()
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建面和边操作失败: {e}")
|
||
|
||
def create_edges(self, container: Any, edges_data: List) -> List:
|
||
"""创建边"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return []
|
||
|
||
created_edges = []
|
||
|
||
for edge_data in edges_data:
|
||
try:
|
||
# 解析边的起点和终点
|
||
start_point = Point3d.parse(
|
||
edge_data.get("start", "0,0,0"))
|
||
end_point = Point3d.parse(edge_data.get("end", "0,0,0"))
|
||
|
||
if start_point and end_point:
|
||
edge_obj = self._create_line_edge(
|
||
container, start_point, end_point)
|
||
if edge_obj:
|
||
created_edges.append(edge_obj)
|
||
memory_manager.register_object(edge_obj)
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建单条边失败: {e}")
|
||
|
||
return created_edges
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建边失败: {e}")
|
||
return []
|
||
|
||
def _create_line_edge(self, container: Any, start: Point3d, end: Point3d) -> Any:
|
||
"""创建线边"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return None
|
||
|
||
# 创建边网格
|
||
mesh_name = f"SUWEdge_{hash(str(start) + str(end)) % 10000}"
|
||
mesh = bpy.data.meshes.new(mesh_name)
|
||
memory_manager.register_mesh(mesh)
|
||
|
||
# 设置边的顶点和边
|
||
vertices = [
|
||
(start.x, start.y, start.z),
|
||
(end.x, end.y, end.z)
|
||
]
|
||
edges = [(0, 1)]
|
||
|
||
mesh.from_pydata(vertices, edges, [])
|
||
mesh.update()
|
||
|
||
# 创建对象
|
||
obj_name = f"SUWEdgeObj_{hash(str(start) + str(end)) % 10000}"
|
||
obj = bpy.data.objects.new(obj_name, mesh)
|
||
memory_manager.register_object(obj)
|
||
|
||
# 添加到集合
|
||
if hasattr(container, 'objects'):
|
||
container.objects.link(obj)
|
||
else:
|
||
bpy.context.scene.collection.objects.link(obj)
|
||
|
||
return obj
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建线边失败: {e}")
|
||
return None
|
||
|
||
# ==================== 选择和可见性管理 ====================
|
||
|
||
def sel_clear(self):
|
||
"""清除选择"""
|
||
try:
|
||
if BLENDER_AVAILABLE:
|
||
bpy.ops.object.select_all(action='DESELECT')
|
||
|
||
self._selected_uid = None
|
||
self._selected_obj = None
|
||
self._selected_zone = None
|
||
self._selected_part = None
|
||
|
||
logger.info("选择已清除")
|
||
|
||
except Exception as e:
|
||
logger.error(f"清除选择失败: {e}")
|
||
|
||
def sel_local(self, obj: Any):
|
||
"""选择本地对象"""
|
||
try:
|
||
if BLENDER_AVAILABLE and obj:
|
||
# 清除当前选择
|
||
bpy.ops.object.select_all(action='DESELECT')
|
||
|
||
# 选择指定对象
|
||
if obj.name in bpy.data.objects:
|
||
bpy.data.objects[obj.name].select_set(True)
|
||
bpy.context.view_layer.objects.active = bpy.data.objects[obj.name]
|
||
|
||
# 更新选择状态
|
||
self._selected_obj = obj.name # 存储名称而非引用
|
||
|
||
logger.info(f"选择对象: {obj.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"选择对象失败: {e}")
|
||
|
||
# ==================== 材质和纹理管理 ====================
|
||
|
||
def add_mat_rgb(self, mat_id: str, alpha: float, r: int, g: int, b: int):
|
||
"""添加RGB材质"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
# 检查材质是否已存在
|
||
if mat_id in self.material_cache:
|
||
material_name = self.material_cache[mat_id]
|
||
if material_name in bpy.data.materials:
|
||
return bpy.data.materials[material_name]
|
||
|
||
# 创建新材质
|
||
material = bpy.data.materials.new(mat_id)
|
||
material.use_nodes = True
|
||
|
||
# 设置颜色
|
||
if material.node_tree:
|
||
principled = material.node_tree.nodes.get("Principled BSDF")
|
||
if principled:
|
||
color = (r/255.0, g/255.0, b/255.0, alpha)
|
||
principled.inputs[0].default_value = color
|
||
|
||
# 设置透明度
|
||
if alpha < 1.0:
|
||
material.blend_method = 'BLEND'
|
||
# Alpha input
|
||
principled.inputs[21].default_value = alpha
|
||
|
||
# 缓存材质
|
||
self.material_cache[mat_id] = material.name
|
||
memory_manager.register_object(material)
|
||
|
||
logger.info(f"创建RGB材质: {mat_id}")
|
||
return material
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建RGB材质失败: {e}")
|
||
return None
|
||
|
||
# ==================== 系统监控和维护 ====================
|
||
|
||
def monitor_memory_usage(self):
|
||
"""监控内存使用情况"""
|
||
try:
|
||
if not BLENDER_AVAILABLE:
|
||
return
|
||
|
||
# 检查是否需要清理
|
||
if memory_manager.should_cleanup():
|
||
logger.info("触发定期内存清理")
|
||
cleanup_count = memory_manager.cleanup_orphaned_data()
|
||
|
||
if cleanup_count > 0:
|
||
logger.info(f"清理了 {cleanup_count} 个孤立数据")
|
||
gc.collect()
|
||
|
||
# 检查内存压力
|
||
total_objects = len(bpy.data.objects)
|
||
total_meshes = len(bpy.data.meshes)
|
||
|
||
if total_objects > 10000 or total_meshes > 5000:
|
||
logger.warning(
|
||
f"内存压力警告: {total_objects} 个对象, {total_meshes} 个网格")
|
||
|
||
# 执行强制清理
|
||
self.force_cleanup()
|
||
|
||
except Exception as e:
|
||
logger.error(f"内存监控失败: {e}")
|
||
|
||
def get_memory_report(self) -> Dict[str, Any]:
|
||
"""获取内存报告"""
|
||
try:
|
||
report = {
|
||
"timestamp": time.time(),
|
||
"creation_stats": self.get_creation_stats(),
|
||
"system_diagnosis": self.diagnose_system_state(),
|
||
"memory_manager": {
|
||
"object_registry_size": len(memory_manager.object_registry),
|
||
"mesh_registry_size": len(memory_manager.mesh_registry),
|
||
"last_cleanup": memory_manager.last_cleanup_time,
|
||
"cleanup_interval": memory_manager.cleanup_interval
|
||
}
|
||
}
|
||
|
||
if BLENDER_AVAILABLE:
|
||
report["blender_data"] = {
|
||
"objects": len(bpy.data.objects),
|
||
"meshes": len(bpy.data.meshes),
|
||
"materials": len(bpy.data.materials),
|
||
"textures": len(bpy.data.textures),
|
||
"images": len(bpy.data.images)
|
||
}
|
||
|
||
return report
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成内存报告失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
# ==================== 属性访问器 ====================
|
||
|
||
@property
|
||
def selected_uid(self):
|
||
return self._selected_uid
|
||
|
||
@property
|
||
def selected_zone(self):
|
||
return self._selected_zone
|
||
|
||
@property
|
||
def selected_part(self):
|
||
return self._selected_part
|
||
|
||
@property
|
||
def selected_obj(self):
|
||
# 返回对象名称而非引用,避免悬空引用
|
||
return self._selected_obj
|
||
|
||
@property
|
||
def server_path(self):
|
||
return self._server_path
|
||
|
||
@property
|
||
def default_zone(self):
|
||
return self._default_zone
|
||
|
||
# ==================== 兼容性方法存根 ====================
|
||
|
||
def c05(self, data: Dict[str, Any]):
|
||
"""业务方法存根 - 保持兼容性"""
|
||
logger.info("执行c05操作")
|
||
|
||
def c06(self, data: Dict[str, Any]):
|
||
"""业务方法存根 - 保持兼容性"""
|
||
logger.info("执行c06操作")
|
||
|
||
def c07(self, data: Dict[str, Any]):
|
||
"""业务方法存根 - 保持兼容性"""
|
||
logger.info("执行c07操作")
|
||
|
||
def c08(self, data: Dict[str, Any]):
|
||
"""业务方法存根 - 保持兼容性"""
|
||
logger.info("执行c08操作")
|
||
|
||
def c10(self, data: Dict[str, Any]):
|
||
"""设置门信息"""
|
||
try:
|
||
uid = data.get("uid")
|
||
doors = data.get("drs", [])
|
||
|
||
logger.info(f"设置门信息: uid={uid}, {len(doors)} 个门")
|
||
|
||
# 存储门信息
|
||
if uid not in self.parts:
|
||
self.parts[uid] = {}
|
||
|
||
self.parts[uid]["doors"] = doors
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置门信息失败: {e}")
|
||
|
||
def show_message(self, data: Dict[str, Any]):
|
||
"""显示消息"""
|
||
try:
|
||
message = data.get("message", "")
|
||
logger.info(f"显示消息: {message}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"显示消息失败: {e}")
|
||
|
||
def set_config(self, data: Dict[str, Any]):
|
||
"""设置配置"""
|
||
try:
|
||
config = data.get("config", {})
|
||
logger.info(f"设置配置: {len(config)} 个配置项")
|
||
|
||
# 应用内存管理相关配置
|
||
if "memory_cleanup_interval" in config:
|
||
memory_manager.cleanup_interval = config["memory_cleanup_interval"]
|
||
|
||
if "batch_size" in config:
|
||
self.batch_size = config["batch_size"]
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置配置失败: {e}")
|
||
|
||
# ==================== 清理和销毁 ====================
|
||
|
||
def shutdown(self):
|
||
"""关闭系统"""
|
||
try:
|
||
logger.info("开始关闭SUWood系统")
|
||
|
||
# 执行最终清理
|
||
self.force_cleanup()
|
||
|
||
# 清理所有缓存
|
||
self.mesh_cache.clear()
|
||
self.material_cache.clear()
|
||
self.object_references.clear()
|
||
|
||
# 清理数据结构
|
||
self.parts.clear()
|
||
self.zones.clear()
|
||
self.hardwares.clear()
|
||
|
||
logger.info("✅ SUWood系统关闭完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"关闭系统失败: {e}")
|
||
|
||
def __del__(self):
|
||
"""析构函数"""
|
||
try:
|
||
self.shutdown()
|
||
except:
|
||
pass
|
||
|
||
# ==================== 模块级别的便利函数 ====================
|
||
|
||
|
||
def create_suw_instance():
|
||
"""创建SUW实例的便利函数"""
|
||
return SUWImpl.get_instance()
|
||
|
||
|
||
def cleanup_blender_memory():
|
||
"""清理Blender内存的便利函数"""
|
||
try:
|
||
if BLENDER_AVAILABLE:
|
||
cleanup_count = memory_manager.cleanup_orphaned_data()
|
||
gc.collect()
|
||
logger.info(f"清理了 {cleanup_count} 个孤立数据")
|
||
return cleanup_count
|
||
return 0
|
||
except Exception as e:
|
||
logger.error(f"清理内存失败: {e}")
|
||
return 0
|
||
|
||
|
||
def get_memory_usage_summary():
|
||
"""获取内存使用摘要"""
|
||
try:
|
||
if BLENDER_AVAILABLE:
|
||
return {
|
||
"objects": len(bpy.data.objects),
|
||
"meshes": len(bpy.data.meshes),
|
||
"materials": len(bpy.data.materials),
|
||
"memory_manager_stats": memory_manager.creation_stats.copy()
|
||
}
|
||
return {"blender_available": False}
|
||
except Exception as e:
|
||
logger.error(f"获取内存使用摘要失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
# ==================== 主程序入口 ====================
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
try:
|
||
logger.info("开始测试SUWImpl内存管理")
|
||
|
||
# 创建实例
|
||
suw = create_suw_instance()
|
||
suw.startup()
|
||
|
||
# 获取内存报告
|
||
report = suw.get_memory_report()
|
||
logger.info(f"内存报告: {report}")
|
||
|
||
# 执行诊断
|
||
issues = suw.diagnose_system_state()
|
||
if not issues:
|
||
logger.info("✅ 系统状态正常")
|
||
|
||
# 清理测试
|
||
cleanup_count = cleanup_blender_memory()
|
||
logger.info(f"清理测试完成: {cleanup_count}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"测试失败: {e}")
|
||
|
||
finally:
|
||
logger.info("测试完成")
|