#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SUW Core - Machining Manager Module 拆分自: suw_impl.py (Line 2292-3500, 4790-4990) 用途: Blender加工管理、几何体创建、布尔运算 版本: 1.0.0 作者: SUWood Team """ from .material_manager import material_manager from .memory_manager import memory_manager from .data_manager import data_manager, get_data_manager import time import logging import threading from typing import Dict, Any, List, Optional # 设置日志 logger = logging.getLogger(__name__) # 检查Blender可用性 try: import bpy import bmesh BLENDER_AVAILABLE = True except ImportError: BLENDER_AVAILABLE = False # 导入依赖模块 # ==================== 加工管理器类 ==================== class MachiningManager: """加工管理器 - 负责所有加工相关操作""" def __init__(self): """ 初始化加工管理器 - 完全独立,不依赖suw_impl """ # 使用全局数据管理器 self.data_manager = get_data_manager() # 加工数据存储 self.machinings = {} # 加工统计 self.machining_stats = { "machinings_created": 0, "trim_operations": 0, "creation_errors": 0 } logger.info("✅ 加工管理器初始化完成") # ==================== 原始命令方法 ==================== def c05(self, data: Dict[str, Any]): """创建加工 - 保持原始方法名和参数""" try: logger.info("🔧 执行c05命令: 创建加工") # 使用线程安全的方式创建加工 def create_machining_batch(): try: return self._create_machining_batch_impl(data) except Exception as e: logger.error(f"加工创建线程失败: {e}") return 0 # 直接执行 return create_machining_batch() except Exception as e: logger.error(f"c05命令执行失败: {e}") return 0 def c0a(self, data: Dict[str, Any]): """删除加工 - 保持原始方法名和参数""" try: logger.info("🗑️ 执行c0a命令: 删除加工") def delete_machining(): try: return self._delete_machining_impl(data) except Exception as e: logger.error(f"加工删除线程失败: {e}") return 0 # 直接执行 return delete_machining() except Exception as e: logger.error(f"c0a命令执行失败: {e}") return 0 # ==================== 核心实现方法 ==================== def _create_machining_batch_impl(self, data: Dict[str, Any]): """批量创建加工的核心实现""" try: if not BLENDER_AVAILABLE: logger.warning("Blender 不可用,跳过加工创建") return 0 uid = data.get("uid") items = data.get("items", []) logger.info(f"🔧 开始批量创建加工: uid={uid}, 项目数={len(items)}") # 获取部件和硬件集合 parts = self._get_parts(data) hardwares = self._get_hardwares(data) # 获取加工集合 machinings = self.data_manager.get_machinings(uid) # 分类处理:可视化加工 vs 布尔运算 visual_works = [] boolean_works = [] for i, work in enumerate(items): if work.get("cancel", 0) == 1: continue cp = work.get("cp") if not cp: continue # 获取组件 component = None if cp in parts: component = parts[cp] elif cp in hardwares: component = hardwares[cp] if not component or not self._is_object_valid(component): logger.info(f"🚨 组件查找失败: cp={cp}") continue work['component'] = component work['index'] = i if work.get("trim3d", 0) == 1: boolean_works.append(work) else: visual_works.append(work) created_count = 0 # 1. 批量处理可视化加工 if visual_works: created_count += self._create_visual_machining_batch( visual_works, uid) # 2. 批量处理布尔运算 if boolean_works: created_count += self._create_boolean_machining_batch( boolean_works) # 更新统计 self.machining_stats["machinings_created"] += created_count self.machining_stats["trim_operations"] += len(boolean_works) logger.info(f"📊 批量加工创建完成: {created_count}/{len(items)} 成功") return created_count except Exception as e: logger.error(f"❌ 批量创建加工失败: {e}") self.machining_stats["creation_errors"] += 1 return 0 def _create_visual_machining_batch(self, visual_works, uid): """批量创建可视化加工对象""" try: if not BLENDER_AVAILABLE: return 0 created_count = 0 # 按组件分组 component_groups = {} for work in visual_works: component = work['component'] if component not in component_groups: component_groups[component] = [] component_groups[component].append(work) for component, works in component_groups.items(): logger.info(f"🔨 为组件 {component.name} 批量创建 {len(works)} 个加工对象") # 创建主加工组 main_machining = bpy.data.objects.new( f"Machining_{component.name}", None) bpy.context.scene.collection.objects.link(main_machining) main_machining.parent = component main_machining["sw_typ"] = "work" self.data_manager.add_machining(uid, main_machining) # 使用bmesh批量创建几何体 bm = bmesh.new() for work in works: try: # 解析坐标 p1 = self._parse_point3d(work.get("p1", "(0,0,0)")) p2 = self._parse_point3d(work.get("p2", "(0,0,0)")) # 根据类型创建几何体 if "tri" in work: self._add_triangle_to_bmesh(bm, work, p1, p2) elif "surf" in work: self._add_surface_to_bmesh(bm, work, p1, p2) else: self._add_circle_to_bmesh(bm, work, p1, p2) created_count += 1 except Exception as e: logger.error(f"创建单个加工几何体失败: {e}") # 创建网格 if bm.verts: mesh = bpy.data.meshes.new( f"MachiningMesh_{component.name}") bm.to_mesh(mesh) mesh.update() # 创建对象 mesh_obj = bpy.data.objects.new( f"MachiningGeometry_{component.name}", mesh) bpy.context.scene.collection.objects.link(mesh_obj) mesh_obj.parent = main_machining # 应用材质 self._set_machining_color(mesh_obj, works[0]) bm.free() return created_count except Exception as e: logger.error(f"批量创建可视化加工失败: {e}") return 0 def _create_boolean_machining_batch(self, boolean_works): """批量创建布尔运算加工""" try: if not BLENDER_AVAILABLE: return 0 created_count = 0 # 按类型分组 circle_data_list = [] triangle_data_list = [] surface_data_list = [] for work in boolean_works: try: p1 = self._parse_point3d(work.get("p1", "(0,0,0)")) p2 = self._parse_point3d(work.get("p2", "(0,0,0)")) component = work['component'] # 检查是否被剪切 if self._work_trimmed(component, work): continue if "tri" in work: tri_data = self._create_triangle_trimmer_data( work, p1, p2) if tri_data: triangle_data_list.append(tri_data) elif "surf" in work: surf_data = self._create_surface_trimmer_data( work, p1, p2) if surf_data: surface_data_list.append(surf_data) else: circle_data = self._create_circle_trimmer_data( work, p1, p2) if circle_data: circle_data_list.append(circle_data) except Exception as e: logger.error(f"处理布尔加工项失败: {e}") # 创建统一剪切器 if circle_data_list: unified_trimmer = self._create_unified_circle_trimmer( circle_data_list, "CircleTrimmer") if unified_trimmer: created_count += len(circle_data_list) if triangle_data_list: unified_trimmer = self._create_unified_triangle_trimmer( triangle_data_list, "TriangleTrimmer") if unified_trimmer: created_count += len(triangle_data_list) if surface_data_list: unified_trimmer = self._create_unified_surface_trimmer( surface_data_list, "SurfaceTrimmer") if unified_trimmer: created_count += len(surface_data_list) return created_count except Exception as e: logger.error(f"批量创建布尔加工失败: {e}") return 0 def _delete_machining_impl(self, data: Dict[str, Any]): """删除加工的核心实现""" try: uid = data.get("uid") items = data.get("items", []) machinings = self.data_manager.get_machinings(uid) if not machinings: logger.info(f"未找到单元 {uid} 的加工数据") return 0 # 分类删除 visual_machinings = [] boolean_machinings = [] for item in items: index = item.get("index", -1) if 0 <= index < len(machinings): machining = machinings[index] if machining and self._is_object_valid(machining): if item.get("trim3d", 0) == 1: boolean_machinings.append(machining) else: visual_machinings.append(machining) deleted_count = 0 # 删除可视化加工 if visual_machinings: deleted_count += self._delete_visual_machining_batch( visual_machinings) # 删除布尔加工 if boolean_machinings: deleted_count += self._delete_boolean_machining_batch( boolean_machinings) # 更新统计 self.machining_stats["deletion_count"] += deleted_count logger.info(f"删除加工完成: {deleted_count} 个") return deleted_count except Exception as e: logger.error(f"删除加工失败: {e}") return 0 # ==================== 几何体创建方法 ==================== def _add_circle_to_bmesh(self, bm, work, p1, p2): """添加圆形到bmesh""" try: # 计算圆心和半径 center = [(p1[0] + p2[0]) / 2, (p1[1] + p2[1]) / 2, (p1[2] + p2[2]) / 2] radius = ((p2[0] - p1[0])**2 + (p2[1] - p1[1]) ** 2 + (p2[2] - p1[2])**2)**0.5 / 2 # 创建圆形 bmesh.ops.create_circle( bm, cap_ends=True, radius=radius, segments=16) # 移动到正确位置 for vert in bm.verts: vert.co.x += center[0] vert.co.y += center[1] vert.co.z += center[2] except Exception as e: logger.error(f"添加圆形到bmesh失败: {e}") def _add_triangle_to_bmesh(self, bm, work, p1, p2): """添加三角形到bmesh""" try: # 创建三角形顶点 v1 = bm.verts.new(p1) v2 = bm.verts.new(p2) v3 = bm.verts.new([(p1[0] + p2[0])/2, p1[1], (p1[2] + p2[2])/2]) # 创建面 bm.faces.new([v1, v2, v3]) except Exception as e: logger.error(f"添加三角形到bmesh失败: {e}") def _add_surface_to_bmesh(self, bm, work, p1, p2): """添加表面到bmesh""" try: # 解析表面顶点 surf = work.get("surf", "") vertices = self._parse_surface_vertices(surf) if len(vertices) >= 3: # 创建顶点 bm_verts = [] for vertex in vertices: bm_verts.append(bm.verts.new(vertex)) # 创建面 if len(bm_verts) >= 3: bm.faces.new(bm_verts) except Exception as e: logger.error(f"添加表面到bmesh失败: {e}") def _create_machining_visual(self, component, work, index): """创建加工可视化对象""" try: if not BLENDER_AVAILABLE: return None machining_name = f"Machining_{component.name}_{index}" # 解析坐标 p1 = self._parse_point3d(work.get("p1", "(0,0,0)")) p2 = self._parse_point3d(work.get("p2", "(0,0,0)")) # 创建几何体 if "tri" in work: return self._create_triangle_machining(machining_name, work, p1, p2) elif "surf" in work: return self._create_surface_machining(machining_name, work, p1, p2) else: return self._create_circle_machining(machining_name, work, p1, p2) except Exception as e: logger.error(f"创建加工可视化失败: {e}") return None def _create_triangle_machining(self, name, work, p1, p2): """创建三角形加工""" try: mesh = bpy.data.meshes.new(name) vertices = [p1, p2, [(p1[0] + p2[0])/2, p1[1], (p1[2] + p2[2])/2]] faces = [(0, 1, 2)] mesh.from_pydata(vertices, [], faces) mesh.update() obj = bpy.data.objects.new(name, mesh) bpy.context.scene.collection.objects.link(obj) return obj except Exception as e: logger.error(f"创建三角形加工失败: {e}") return None def _create_surface_machining(self, name, work, p1, p2): """创建表面加工""" try: surf = work.get("surf", "") vertices = self._parse_surface_vertices(surf) if len(vertices) < 3: return None mesh = bpy.data.meshes.new(name) faces = [list(range(len(vertices)))] mesh.from_pydata(vertices, [], faces) mesh.update() obj = bpy.data.objects.new(name, mesh) bpy.context.scene.collection.objects.link(obj) return obj except Exception as e: logger.error(f"创建表面加工失败: {e}") return None def _create_circle_machining(self, name, work, p1, p2): """创建圆形加工""" try: # 计算圆心和半径 center = [(p1[0] + p2[0]) / 2, (p1[1] + p2[1]) / 2, (p1[2] + p2[2]) / 2] radius = ((p2[0] - p1[0])**2 + (p2[1] - p1[1]) ** 2 + (p2[2] - p1[2])**2)**0.5 / 2 # 创建圆形网格 bpy.ops.mesh.primitive_circle_add(radius=radius, location=center) obj = bpy.context.active_object obj.name = name return obj except Exception as e: logger.error(f"创建圆形加工失败: {e}") return None # ==================== 布尔运算相关方法 ==================== def _apply_fast_boolean(self, board, trimmer, work): """应用快速布尔运算""" try: if not BLENDER_AVAILABLE or not board or not trimmer: return False # 使用Blender的布尔修改器 boolean_modifier = board.modifiers.new( name="Boolean", type='BOOLEAN') boolean_modifier.operation = 'DIFFERENCE' boolean_modifier.object = trimmer # 应用修改器 bpy.context.view_layer.objects.active = board bpy.ops.object.modifier_apply(modifier=boolean_modifier.name) return True except Exception as e: logger.error(f"应用布尔运算失败: {e}") return False def _work_trimmed(self, component, work): """检查工作是否被剪切""" try: # 简化的检查逻辑 return False except Exception as e: logger.error(f"检查剪切状态失败: {e}") return False def _create_unified_circle_trimmer(self, circle_data_list, component_name): """创建统一圆形剪切器""" try: if not circle_data_list: return None # 创建合并的圆形剪切器 bm = bmesh.new() for circle_data in circle_data_list: # 添加圆形几何体到bmesh center = circle_data.get("center", [0, 0, 0]) radius = circle_data.get("radius", 1.0) bmesh.ops.create_circle( bm, cap_ends=True, radius=radius, segments=16) # 移动到正确位置 for vert in bm.verts[-16:]: # 最后16个顶点 vert.co.x += center[0] vert.co.y += center[1] vert.co.z += center[2] # 创建网格 mesh = bpy.data.meshes.new( f"UnifiedCircleTrimmer_{component_name}") bm.to_mesh(mesh) mesh.update() obj = bpy.data.objects.new( f"UnifiedCircleTrimmer_{component_name}", mesh) bpy.context.scene.collection.objects.link(obj) bm.free() return obj except Exception as e: logger.error(f"创建统一圆形剪切器失败: {e}") return None def _create_unified_triangle_trimmer(self, triangle_data_list, component_name): """创建统一三角形剪切器""" try: if not triangle_data_list: return None # 简化实现 return None except Exception as e: logger.error(f"创建统一三角形剪切器失败: {e}") return None def _create_unified_surface_trimmer(self, surface_data_list, component_name): """创建统一表面剪切器""" try: if not surface_data_list: return None # 简化实现 return None except Exception as e: logger.error(f"创建统一表面剪切器失败: {e}") return None def _create_circle_trimmer_data(self, work, p1, p2): """创建圆形剪切器数据""" try: center = [(p1[0] + p2[0]) / 2, (p1[1] + p2[1]) / 2, (p1[2] + p2[2]) / 2] radius = ((p2[0] - p1[0])**2 + (p2[1] - p1[1]) ** 2 + (p2[2] - p1[2])**2)**0.5 / 2 return { "center": center, "radius": radius, "work": work } except Exception as e: logger.error(f"创建圆形剪切器数据失败: {e}") return None def _create_triangle_trimmer_data(self, work, p1, p2): """创建三角形剪切器数据""" try: return { "p1": p1, "p2": p2, "work": work } except Exception as e: logger.error(f"创建三角形剪切器数据失败: {e}") return None def _create_surface_trimmer_data(self, work, p1, p2): """创建表面剪切器数据""" try: return { "p1": p1, "p2": p2, "work": work } except Exception as e: logger.error(f"创建表面剪切器数据失败: {e}") return None def _delete_visual_machining_batch(self, visual_machinings): """删除可视化加工批次""" try: deleted_count = 0 for machining in visual_machinings: if self._delete_object_safe(machining): deleted_count += 1 return deleted_count except Exception as e: logger.error(f"删除可视化加工批次失败: {e}") return 0 def _delete_boolean_machining_batch(self, boolean_machinings): """删除布尔加工批次""" try: deleted_count = 0 for machining in boolean_machinings: if self._delete_object_safe(machining): deleted_count += 1 return deleted_count except Exception as e: logger.error(f"删除布尔加工批次失败: {e}") return 0 # ==================== 辅助方法 ==================== def _parse_point3d(self, point_str): """解析3D点字符串""" try: # 移除括号和空格 point_str = point_str.strip("()").replace(" ", "") coords = point_str.split(",") if len(coords) >= 3: return [float(coords[0]), float(coords[1]), float(coords[2])] else: return [0.0, 0.0, 0.0] except Exception as e: logger.error(f"解析3D点失败: {e}") return [0.0, 0.0, 0.0] def _parse_surface_vertices(self, surface): """解析表面顶点""" try: # 简化的表面解析 vertices = [] # 这里应该根据实际的表面数据格式解析 return vertices except Exception as e: logger.error(f"解析表面顶点失败: {e}") return [] def _set_machining_color(self, machining, item): """设置加工颜色""" try: # 获取加工材质 material = material_manager.get_texture("mat_machining") if material and hasattr(machining, 'data') and machining.data: if not machining.data.materials: machining.data.materials.append(material) else: machining.data.materials[0] = material except Exception as e: logger.error(f"设置加工颜色失败: {e}") def _is_object_valid(self, obj) -> bool: """检查对象是否有效""" try: if not obj: return False if not BLENDER_AVAILABLE: return True return obj.name in bpy.data.objects except: return False def _delete_object_safe(self, obj) -> bool: """安全删除对象""" try: if not obj or not BLENDER_AVAILABLE: return False if obj.name in bpy.data.objects: bpy.data.objects.remove(obj, do_unlink=True) return True return False except Exception as e: logger.error(f"删除对象失败: {e}") return False def _get_parts(self, data: Dict[str, Any]) -> Dict[str, Any]: """获取部件数据 - 使用data_manager""" return self.data_manager.get_parts(data) def _get_hardwares(self, data: Dict[str, Any]) -> Dict[str, Any]: """获取硬件数据 - 使用data_manager""" return self.data_manager.get_hardwares(data) # ==================== 统计和管理方法 ==================== def get_machining_stats(self) -> Dict[str, Any]: """获取加工统计信息""" try: total_machinings = sum(len(self.data_manager.get_machinings(uid)) for uid in self.data_manager.machinings.keys()) stats = { "total_units": len(self.data_manager.machinings), "total_machinings": total_machinings, "creation_stats": self.machining_stats.copy(), "memory_usage": { "machinings_dict_size": len(self.data_manager.machinings), } } if BLENDER_AVAILABLE: stats["blender_objects"] = len([obj for obj in bpy.data.objects if obj.get("sw_typ") == "work"]) return stats except Exception as e: logger.error(f"获取加工统计失败: {e}") return {"error": str(e)} def get_creation_stats(self) -> Dict[str, Any]: """获取创建统计信息""" return self.machining_stats.copy() def reset_creation_stats(self): """重置创建统计""" self.machining_stats = { "machinings_created": 0, "trim_operations": 0, "creation_errors": 0 } logger.info("加工统计已重置") def cleanup(self): """清理加工管理器""" try: # 清理所有加工数据 total_deleted = 0 for uid in list(self.data_manager.machinings.keys()): machinings = self.data_manager.get_machinings(uid) for machining in machinings: if self._delete_object_safe(machining): total_deleted += 1 self.data_manager.clear_machinings(uid) self.reset_creation_stats() logger.info(f"✅ 加工管理器清理完成,删除了 {total_deleted} 个对象") except Exception as e: logger.error(f"清理加工管理器失败: {e}") # ==================== 全局加工管理器实例 ==================== # 全局实例 machining_manager = None def init_machining_manager(): """初始化全局加工管理器实例 - 不再需要suw_impl参数""" global machining_manager machining_manager = MachiningManager() return machining_manager def get_machining_manager(): """获取全局加工管理器实例""" global machining_manager if machining_manager is None: machining_manager = init_machining_manager() return machining_manager