From ca6a73c3be915f67d2365450856219f73e28a53e Mon Sep 17 00:00:00 2001 From: Minidoracat Date: Sat, 7 Jun 2025 02:19:26 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E6=96=B0=E5=A2=9E=E7=B5=B1?= =?UTF-8?q?=E4=B8=80=E9=8C=AF=E8=AA=A4=E8=99=95=E7=90=86=E6=A1=86=E6=9E=B6?= =?UTF-8?q?=E8=88=87=E8=B3=87=E6=BA=90=E7=AE=A1=E7=90=86=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E5=84=AA=E5=8C=96=E8=87=A8=E6=99=82=E6=96=87=E4=BB=B6=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E8=88=87=E7=AB=AF=E5=8F=A3=E6=9F=A5=E6=89=BE=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E6=8F=90=E5=8D=87=E7=B3=BB=E7=B5=B1=E7=A9=A9?= =?UTF-8?q?=E5=AE=9A=E6=80=A7=E8=88=87=E5=8F=AF=E7=B6=AD=E8=AD=B7=E6=80=A7?= =?UTF-8?q?=E3=80=82=E6=9B=B4=E6=96=B0=20Web=20UI=20=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E9=82=8F=E8=BC=AF=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E8=87=AA=E5=8B=95=E6=B8=85=E7=90=86=E8=88=87=E9=80=B2=E7=A8=8B?= =?UTF-8?q?=E8=A8=BB=E5=86=8A=EF=BC=8C=E4=B8=A6=E5=A2=9E=E5=BC=B7=E5=A3=93?= =?UTF-8?q?=E7=B8=AE=E9=85=8D=E7=BD=AE=E7=AE=A1=E7=90=86=E3=80=82=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E6=B8=AC=E8=A9=A6=E6=A8=A1=E7=B5=84=E4=BB=A5=E7=A2=BA?= =?UTF-8?q?=E4=BF=9D=E5=8A=9F=E8=83=BD=E6=AD=A3=E7=A2=BA=E6=80=A7=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gui/locales/en/translations.json | 41 +- .../gui/locales/zh-CN/translations.json | 41 +- .../gui/locales/zh-TW/translations.json | 41 +- .../gui/widgets/image_upload.py | 19 +- src/mcp_feedback_enhanced/server.py | 65 +- src/mcp_feedback_enhanced/test_web_ui.py | 16 +- src/mcp_feedback_enhanced/testing/utils.py | 29 +- src/mcp_feedback_enhanced/utils/__init__.py | 27 + .../utils/error_handler.py | 455 +++++++++++ .../utils/resource_manager.py | 719 ++++++++++++++++++ src/mcp_feedback_enhanced/web/main.py | 95 ++- .../web/models/feedback_session.py | 16 +- .../web/utils/compression_config.py | 187 +++++ .../web/utils/compression_monitor.py | 290 +++++++ .../web/utils/port_manager.py | 307 ++++++++ tests/test_error_handler.py | 253 ++++++ tests/test_gzip_compression.py | 346 +++++++++ tests/test_port_manager.py | 249 ++++++ tests/test_resource_manager.py | 394 ++++++++++ 19 files changed, 3543 insertions(+), 47 deletions(-) create mode 100644 src/mcp_feedback_enhanced/utils/__init__.py create mode 100644 src/mcp_feedback_enhanced/utils/error_handler.py create mode 100644 src/mcp_feedback_enhanced/utils/resource_manager.py create mode 100644 src/mcp_feedback_enhanced/web/utils/compression_config.py create mode 100644 src/mcp_feedback_enhanced/web/utils/compression_monitor.py create mode 100644 src/mcp_feedback_enhanced/web/utils/port_manager.py create mode 100644 tests/test_error_handler.py create mode 100644 tests/test_gzip_compression.py create mode 100644 tests/test_port_manager.py create mode 100644 tests/test_resource_manager.py diff --git a/src/mcp_feedback_enhanced/gui/locales/en/translations.json b/src/mcp_feedback_enhanced/gui/locales/en/translations.json index a1e8819..ebde0b5 100644 --- a/src/mcp_feedback_enhanced/gui/locales/en/translations.json +++ b/src/mcp_feedback_enhanced/gui/locales/en/translations.json @@ -192,7 +192,46 @@ "confirmClearAll": "Are you sure you want to clear all {count} images?", "confirmClearTitle": "Confirm Clear", "fileSizeExceeded": "Image {filename} size is {size}MB, exceeding 1MB limit!\nRecommend using image editing software to compress before uploading.", - "dataSizeExceeded": "Image {filename} data size exceeds 1MB limit!" + "dataSizeExceeded": "Image {filename} data size exceeds 1MB limit!", + "types": { + "network": "Network connection issue", + "file_io": "File read/write issue", + "process": "Process execution issue", + "timeout": "Operation timeout", + "user_cancel": "User cancelled the operation", + "system": "System issue", + "permission": "Insufficient permissions", + "validation": "Data validation failed", + "dependency": "Dependency issue", + "config": "Configuration issue" + }, + "solutions": { + "network": [ + "Check network connection", + "Verify firewall settings", + "Try restarting the application" + ], + "file_io": [ + "Check if file exists", + "Verify file permissions", + "Check available disk space" + ], + "process": [ + "Check if process is running", + "Verify system resources", + "Try restarting related services" + ], + "timeout": [ + "Increase timeout settings", + "Check network latency", + "Retry the operation later" + ], + "permission": [ + "Run as administrator", + "Check file/directory permissions", + "Contact system administrator" + ] + } }, "languageSelector": "🌐 Language", "languageNames": { diff --git a/src/mcp_feedback_enhanced/gui/locales/zh-CN/translations.json b/src/mcp_feedback_enhanced/gui/locales/zh-CN/translations.json index 2f30086..f415dd9 100644 --- a/src/mcp_feedback_enhanced/gui/locales/zh-CN/translations.json +++ b/src/mcp_feedback_enhanced/gui/locales/zh-CN/translations.json @@ -172,7 +172,46 @@ "confirmClearAll": "确定要清除所有 {count} 张图片吗?", "confirmClearTitle": "确认清除", "fileSizeExceeded": "图片 {filename} 大小为 {size}MB,超过 1MB 限制!\n建议使用图片编辑软件压缩后再上传。", - "dataSizeExceeded": "图片 {filename} 数据大小超过 1MB 限制!" + "dataSizeExceeded": "图片 {filename} 数据大小超过 1MB 限制!", + "types": { + "network": "网络连接出现问题", + "file_io": "文件读写出现问题", + "process": "进程执行出现问题", + "timeout": "操作超时", + "user_cancel": "用户取消了操作", + "system": "系统出现问题", + "permission": "权限不足", + "validation": "数据验证失败", + "dependency": "依赖组件出现问题", + "config": "配置出现问题" + }, + "solutions": { + "network": [ + "检查网络连接是否正常", + "确认防火墙设置", + "尝试重新启动应用程序" + ], + "file_io": [ + "检查文件是否存在", + "确认文件权限", + "检查磁盘空间是否足够" + ], + "process": [ + "检查进程是否正在运行", + "确认系统资源是否足够", + "尝试重新启动相关服务" + ], + "timeout": [ + "增加超时时间设置", + "检查网络延迟", + "稍后重试操作" + ], + "permission": [ + "以管理员身份运行", + "检查文件/目录权限", + "联系系统管理员" + ] + } }, "aiSummary": "AI 工作摘要", "languageSelector": "🌐 语言选择", diff --git a/src/mcp_feedback_enhanced/gui/locales/zh-TW/translations.json b/src/mcp_feedback_enhanced/gui/locales/zh-TW/translations.json index d134d75..80dc117 100644 --- a/src/mcp_feedback_enhanced/gui/locales/zh-TW/translations.json +++ b/src/mcp_feedback_enhanced/gui/locales/zh-TW/translations.json @@ -188,7 +188,46 @@ "confirmClearAll": "確定要清除所有 {count} 張圖片嗎?", "confirmClearTitle": "確認清除", "fileSizeExceeded": "圖片 {filename} 大小為 {size}MB,超過 1MB 限制!\n建議使用圖片編輯軟體壓縮後再上傳。", - "dataSizeExceeded": "圖片 {filename} 數據大小超過 1MB 限制!" + "dataSizeExceeded": "圖片 {filename} 數據大小超過 1MB 限制!", + "types": { + "network": "網絡連接出現問題", + "file_io": "文件讀寫出現問題", + "process": "進程執行出現問題", + "timeout": "操作超時", + "user_cancel": "用戶取消了操作", + "system": "系統出現問題", + "permission": "權限不足", + "validation": "數據驗證失敗", + "dependency": "依賴組件出現問題", + "config": "配置出現問題" + }, + "solutions": { + "network": [ + "檢查網絡連接是否正常", + "確認防火牆設置", + "嘗試重新啟動應用程序" + ], + "file_io": [ + "檢查文件是否存在", + "確認文件權限", + "檢查磁盤空間是否足夠" + ], + "process": [ + "檢查進程是否正在運行", + "確認系統資源是否足夠", + "嘗試重新啟動相關服務" + ], + "timeout": [ + "增加超時時間設置", + "檢查網絡延遲", + "稍後重試操作" + ], + "permission": [ + "以管理員身份運行", + "檢查文件/目錄權限", + "聯繫系統管理員" + ] + } }, "languageNames": { "zhTw": "繁體中文", diff --git a/src/mcp_feedback_enhanced/gui/widgets/image_upload.py b/src/mcp_feedback_enhanced/gui/widgets/image_upload.py index f2e79bf..88332a1 100644 --- a/src/mcp_feedback_enhanced/gui/widgets/image_upload.py +++ b/src/mcp_feedback_enhanced/gui/widgets/image_upload.py @@ -25,6 +25,7 @@ from PySide6.QtWidgets import QSizePolicy # 導入多語系支援 from ...i18n import t from ...debug import gui_debug_log as debug_log +from ...utils.resource_manager import get_resource_manager, create_temp_file from .image_preview import ImagePreviewWidget @@ -37,6 +38,7 @@ class ImageUploadWidget(QWidget): self.images: Dict[str, Dict[str, str]] = {} self.config_manager = config_manager self._last_paste_time = 0 # 添加最後貼上時間記錄 + self.resource_manager = get_resource_manager() # 獲取資源管理器 self._setup_ui() self.setAcceptDrops(True) # 啟動時清理舊的臨時文件 @@ -350,20 +352,19 @@ class ImageUploadWidget(QWidget): if mimeData.hasImage(): image = clipboard.image() if not image.isNull(): - # 創建一個唯一的臨時文件名 - temp_dir = Path.home() / ".cache" / "mcp-feedback-enhanced" - temp_dir.mkdir(parents=True, exist_ok=True) - - timestamp = int(time.time() * 1000) - temp_file = temp_dir / f"clipboard_{timestamp}_{uuid.uuid4().hex[:8]}.png" + # 使用資源管理器創建臨時文件 + temp_file = create_temp_file( + suffix=".png", + prefix=f"clipboard_{int(time.time() * 1000)}_" + ) # 保存剪貼板圖片 - if image.save(str(temp_file), "PNG"): + if image.save(temp_file, "PNG"): if os.path.getsize(temp_file) > 0: - self._add_images([str(temp_file)]) + self._add_images([temp_file]) debug_log(f"從剪貼板成功粘貼圖片: {temp_file}") else: - QMessageBox.warning(self, t('errors.warning'), t('errors.imageSaveEmpty', path=str(temp_file))) + QMessageBox.warning(self, t('errors.warning'), t('errors.imageSaveEmpty', path=temp_file)) else: QMessageBox.warning(self, t('errors.warning'), t('errors.imageSaveFailed')) else: diff --git a/src/mcp_feedback_enhanced/server.py b/src/mcp_feedback_enhanced/server.py index c0002e0..c1d5606 100644 --- a/src/mcp_feedback_enhanced/server.py +++ b/src/mcp_feedback_enhanced/server.py @@ -38,6 +38,12 @@ from .i18n import get_i18n_manager # 導入統一的調試功能 from .debug import server_debug_log as debug_log +# 導入錯誤處理框架 +from .utils.error_handler import ErrorHandler, ErrorType + +# 導入資源管理器 +from .utils.resource_manager import get_resource_manager, create_temp_file + # ===== 編碼初始化 ===== def init_encoding(): """初始化編碼設置,確保正確處理中文字符""" @@ -228,8 +234,8 @@ def save_feedback_to_file(feedback_data: dict, file_path: str = None) -> str: str: 儲存的文件路徑 """ if file_path is None: - temp_fd, file_path = tempfile.mkstemp(suffix='.json', prefix='feedback_') - os.close(temp_fd) + # 使用資源管理器創建臨時文件 + file_path = create_temp_file(suffix='.json', prefix='feedback_') # 確保目錄存在 directory = os.path.dirname(file_path) @@ -401,9 +407,13 @@ def process_images(images_data: List[dict]) -> List[MCPImage]: debug_log(f"圖片 {i} ({file_name}) 處理成功,格式: {image_format}") except Exception as e: - debug_log(f"圖片 {i} 處理失敗: {e}") - import traceback - debug_log(f"詳細錯誤: {traceback.format_exc()}") + # 使用統一錯誤處理(不影響 JSON RPC) + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "圖片處理", "image_index": i}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"圖片 {i} 處理失敗 [錯誤ID: {error_id}]: {e}") debug_log(f"共處理 {len(mcp_images)} 張圖片") return mcp_images @@ -539,9 +549,18 @@ async def interactive_feedback( return feedback_items except Exception as e: - error_msg = f"回饋收集錯誤: {str(e)}" - debug_log(f"錯誤: {error_msg}") - return [TextContent(type="text", text=error_msg)] + # 使用統一錯誤處理,但不影響 JSON RPC 響應 + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "回饋收集", "project_dir": project_dir}, + error_type=ErrorType.SYSTEM + ) + + # 生成用戶友好的錯誤信息 + user_error_msg = ErrorHandler.format_user_error(e, include_technical=False) + debug_log(f"回饋收集錯誤 [錯誤ID: {error_id}]: {str(e)}") + + return [TextContent(type="text", text=user_error_msg)] async def launch_web_ui_with_timeout(project_dir: str, summary: str, timeout: int) -> dict: @@ -565,10 +584,18 @@ async def launch_web_ui_with_timeout(project_dir: str, summary: str, timeout: in # 傳遞 timeout 參數給 Web UI return await launch_web_feedback_ui(project_dir, summary, timeout) except ImportError as e: - debug_log(f"無法導入 Web UI 模組: {e}") + # 使用統一錯誤處理 + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "Web UI 模組導入", "module": "web"}, + error_type=ErrorType.DEPENDENCY + ) + user_error_msg = ErrorHandler.format_user_error(e, ErrorType.DEPENDENCY, include_technical=False) + debug_log(f"Web UI 模組導入失敗 [錯誤ID: {error_id}]: {e}") + return { "command_logs": "", - "interactive_feedback": f"Web UI 模組導入失敗: {str(e)}", + "interactive_feedback": user_error_msg, "images": [] } except TimeoutError as e: @@ -587,19 +614,31 @@ async def launch_web_ui_with_timeout(project_dir: str, summary: str, timeout: in "images": [] } except Exception as e: - error_msg = f"Web UI 錯誤: {e}" - debug_log(f"❌ {error_msg}") + # 使用統一錯誤處理 + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "Web UI 啟動", "timeout": timeout}, + error_type=ErrorType.SYSTEM + ) + user_error_msg = ErrorHandler.format_user_error(e, include_technical=False) + debug_log(f"❌ Web UI 錯誤 [錯誤ID: {error_id}]: {e}") + # 發生錯誤時也要停止 Web 服務器 try: from .web import stop_web_ui stop_web_ui() debug_log("Web UI 服務器已因錯誤而停止") except Exception as stop_error: + ErrorHandler.log_error_with_context( + stop_error, + context={"operation": "Web UI 服務器停止"}, + error_type=ErrorType.SYSTEM + ) debug_log(f"停止 Web UI 服務器時發生錯誤: {stop_error}") return { "command_logs": "", - "interactive_feedback": f"錯誤: {str(e)}", + "interactive_feedback": user_error_msg, "images": [] } diff --git a/src/mcp_feedback_enhanced/test_web_ui.py b/src/mcp_feedback_enhanced/test_web_ui.py index 863df70..3b8740f 100644 --- a/src/mcp_feedback_enhanced/test_web_ui.py +++ b/src/mcp_feedback_enhanced/test_web_ui.py @@ -95,11 +95,17 @@ def get_test_summary(): def find_free_port(): """Find a free port to use for testing""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - s.listen(1) - port = s.getsockname()[1] - return port + try: + # 嘗試使用增強的端口管理 + from .web.utils.port_manager import PortManager + return PortManager.find_free_port_enhanced(preferred_port=8765, auto_cleanup=False) + except ImportError: + # 回退到原始方法 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + s.listen(1) + port = s.getsockname()[1] + return port def test_web_ui(keep_running=False): """Test the Web UI functionality""" diff --git a/src/mcp_feedback_enhanced/testing/utils.py b/src/mcp_feedback_enhanced/testing/utils.py index 114bf50..7c62ed1 100644 --- a/src/mcp_feedback_enhanced/testing/utils.py +++ b/src/mcp_feedback_enhanced/testing/utils.py @@ -54,15 +54,26 @@ class TestUtils: @staticmethod def find_free_port(start_port: int = 8765, max_attempts: int = 100) -> int: - """尋找可用端口""" - for port in range(start_port, start_port + max_attempts): - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('127.0.0.1', port)) - return port - except OSError: - continue - raise RuntimeError(f"無法找到可用端口 (嘗試範圍: {start_port}-{start_port + max_attempts})") + """尋找可用端口 - 使用增強的端口管理""" + try: + # 嘗試使用增強的端口管理 + from ..web.utils.port_manager import PortManager + return PortManager.find_free_port_enhanced( + preferred_port=start_port, + auto_cleanup=False, # 測試時不自動清理 + host='127.0.0.1', + max_attempts=max_attempts + ) + except ImportError: + # 回退到原始方法 + for port in range(start_port, start_port + max_attempts): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', port)) + return port + except OSError: + continue + raise RuntimeError(f"無法找到可用端口 (嘗試範圍: {start_port}-{start_port + max_attempts})") @staticmethod def is_port_open(host: str, port: int, timeout: float = 1.0) -> bool: diff --git a/src/mcp_feedback_enhanced/utils/__init__.py b/src/mcp_feedback_enhanced/utils/__init__.py new file mode 100644 index 0000000..2e2001e --- /dev/null +++ b/src/mcp_feedback_enhanced/utils/__init__.py @@ -0,0 +1,27 @@ +""" +MCP Feedback Enhanced 工具模組 +============================ + +提供各種工具類和函數,包括錯誤處理、資源管理等。 +""" + +from .error_handler import ErrorHandler, ErrorType +from .resource_manager import ( + ResourceManager, + get_resource_manager, + create_temp_file, + create_temp_dir, + register_process, + cleanup_all_resources +) + +__all__ = [ + 'ErrorHandler', + 'ErrorType', + 'ResourceManager', + 'get_resource_manager', + 'create_temp_file', + 'create_temp_dir', + 'register_process', + 'cleanup_all_resources' +] diff --git a/src/mcp_feedback_enhanced/utils/error_handler.py b/src/mcp_feedback_enhanced/utils/error_handler.py new file mode 100644 index 0000000..dd6a867 --- /dev/null +++ b/src/mcp_feedback_enhanced/utils/error_handler.py @@ -0,0 +1,455 @@ +""" +統一錯誤處理框架 +================ + +提供統一的錯誤處理機制,包括: +- 錯誤類型分類 +- 用戶友好錯誤信息 +- 錯誤上下文記錄 +- 解決方案建議 +- 國際化支持 + +注意:此模組不會影響 JSON RPC 通信,所有錯誤處理都在應用層進行。 +""" + +import os +import sys +import traceback +import time +from enum import Enum +from typing import Dict, Any, Optional, List, Tuple +from ..debug import debug_log + + +class ErrorType(Enum): + """錯誤類型枚舉""" + NETWORK = "network" # 網絡相關錯誤 + FILE_IO = "file_io" # 文件 I/O 錯誤 + PROCESS = "process" # 進程相關錯誤 + TIMEOUT = "timeout" # 超時錯誤 + USER_CANCEL = "user_cancel" # 用戶取消操作 + SYSTEM = "system" # 系統錯誤 + PERMISSION = "permission" # 權限錯誤 + VALIDATION = "validation" # 數據驗證錯誤 + DEPENDENCY = "dependency" # 依賴錯誤 + CONFIGURATION = "config" # 配置錯誤 + + +class ErrorSeverity(Enum): + """錯誤嚴重程度""" + LOW = "low" # 低:不影響核心功能 + MEDIUM = "medium" # 中:影響部分功能 + HIGH = "high" # 高:影響核心功能 + CRITICAL = "critical" # 嚴重:系統無法正常運行 + + +class ErrorHandler: + """統一錯誤處理器""" + + # 錯誤類型到用戶友好信息的映射 + _ERROR_MESSAGES = { + ErrorType.NETWORK: { + "zh-TW": "網絡連接出現問題", + "zh-CN": "网络连接出现问题", + "en": "Network connection issue" + }, + ErrorType.FILE_IO: { + "zh-TW": "文件讀寫出現問題", + "zh-CN": "文件读写出现问题", + "en": "File read/write issue" + }, + ErrorType.PROCESS: { + "zh-TW": "進程執行出現問題", + "zh-CN": "进程执行出现问题", + "en": "Process execution issue" + }, + ErrorType.TIMEOUT: { + "zh-TW": "操作超時", + "zh-CN": "操作超时", + "en": "Operation timeout" + }, + ErrorType.USER_CANCEL: { + "zh-TW": "用戶取消了操作", + "zh-CN": "用户取消了操作", + "en": "User cancelled the operation" + }, + ErrorType.SYSTEM: { + "zh-TW": "系統出現問題", + "zh-CN": "系统出现问题", + "en": "System issue" + }, + ErrorType.PERMISSION: { + "zh-TW": "權限不足", + "zh-CN": "权限不足", + "en": "Insufficient permissions" + }, + ErrorType.VALIDATION: { + "zh-TW": "數據驗證失敗", + "zh-CN": "数据验证失败", + "en": "Data validation failed" + }, + ErrorType.DEPENDENCY: { + "zh-TW": "依賴組件出現問題", + "zh-CN": "依赖组件出现问题", + "en": "Dependency issue" + }, + ErrorType.CONFIGURATION: { + "zh-TW": "配置出現問題", + "zh-CN": "配置出现问题", + "en": "Configuration issue" + } + } + + # 錯誤解決建議 + _ERROR_SOLUTIONS = { + ErrorType.NETWORK: { + "zh-TW": [ + "檢查網絡連接是否正常", + "確認防火牆設置", + "嘗試重新啟動應用程序" + ], + "zh-CN": [ + "检查网络连接是否正常", + "确认防火墙设置", + "尝试重新启动应用程序" + ], + "en": [ + "Check network connection", + "Verify firewall settings", + "Try restarting the application" + ] + }, + ErrorType.FILE_IO: { + "zh-TW": [ + "檢查文件是否存在", + "確認文件權限", + "檢查磁盤空間是否足夠" + ], + "zh-CN": [ + "检查文件是否存在", + "确认文件权限", + "检查磁盘空间是否足够" + ], + "en": [ + "Check if file exists", + "Verify file permissions", + "Check available disk space" + ] + }, + ErrorType.PROCESS: { + "zh-TW": [ + "檢查進程是否正在運行", + "確認系統資源是否足夠", + "嘗試重新啟動相關服務" + ], + "zh-CN": [ + "检查进程是否正在运行", + "确认系统资源是否足够", + "尝试重新启动相关服务" + ], + "en": [ + "Check if process is running", + "Verify system resources", + "Try restarting related services" + ] + }, + ErrorType.TIMEOUT: { + "zh-TW": [ + "增加超時時間設置", + "檢查網絡延遲", + "稍後重試操作" + ], + "zh-CN": [ + "增加超时时间设置", + "检查网络延迟", + "稍后重试操作" + ], + "en": [ + "Increase timeout settings", + "Check network latency", + "Retry the operation later" + ] + }, + ErrorType.PERMISSION: { + "zh-TW": [ + "以管理員身份運行", + "檢查文件/目錄權限", + "聯繫系統管理員" + ], + "zh-CN": [ + "以管理员身份运行", + "检查文件/目录权限", + "联系系统管理员" + ], + "en": [ + "Run as administrator", + "Check file/directory permissions", + "Contact system administrator" + ] + } + } + + @staticmethod + def get_current_language() -> str: + """獲取當前語言設置""" + try: + # 嘗試從 i18n 模組獲取當前語言 + from ..i18n import get_i18n_manager + return get_i18n_manager().get_current_language() + except Exception: + # 回退到環境變數或默認語言 + return os.getenv("MCP_LANGUAGE", "zh-TW") + + @staticmethod + def get_i18n_error_message(error_type: ErrorType) -> str: + """從國際化系統獲取錯誤信息""" + try: + from ..i18n import get_i18n_manager + i18n = get_i18n_manager() + key = f"errors.types.{error_type.value}" + message = i18n.t(key) + # 如果返回的是鍵本身,說明沒有找到翻譯,使用回退 + if message == key: + raise Exception("Translation not found") + return message + except Exception: + # 回退到內建映射 + language = ErrorHandler.get_current_language() + error_messages = ErrorHandler._ERROR_MESSAGES.get(error_type, {}) + return error_messages.get(language, error_messages.get("zh-TW", "發生未知錯誤")) + + @staticmethod + def get_i18n_error_solutions(error_type: ErrorType) -> List[str]: + """從國際化系統獲取錯誤解決方案""" + try: + from ..i18n import get_i18n_manager + i18n = get_i18n_manager() + key = f"errors.solutions.{error_type.value}" + solutions = i18n.t(key) + if isinstance(solutions, list) and len(solutions) > 0: + return solutions + # 如果沒有找到或為空,使用回退 + raise Exception("Solutions not found") + except Exception: + # 回退到內建映射 + language = ErrorHandler.get_current_language() + solutions = ErrorHandler._ERROR_SOLUTIONS.get(error_type, {}) + return solutions.get(language, solutions.get("zh-TW", [])) + + @staticmethod + def classify_error(error: Exception) -> ErrorType: + """ + 根據異常類型自動分類錯誤 + + Args: + error: Python 異常對象 + + Returns: + ErrorType: 錯誤類型 + """ + error_name = type(error).__name__ + error_message = str(error).lower() + + # 超時錯誤(優先檢查,避免被網絡錯誤覆蓋) + if 'timeout' in error_name.lower() or 'timeout' in error_message: + return ErrorType.TIMEOUT + + # 權限錯誤(優先檢查,避免被文件錯誤覆蓋) + if 'permission' in error_name.lower(): + return ErrorType.PERMISSION + if any(keyword in error_message for keyword in ['permission denied', 'access denied', 'forbidden']): + return ErrorType.PERMISSION + + # 網絡相關錯誤 + if any(keyword in error_name.lower() for keyword in ['connection', 'network', 'socket']): + return ErrorType.NETWORK + if any(keyword in error_message for keyword in ['connection', 'network', 'socket']): + return ErrorType.NETWORK + + # 文件 I/O 錯誤 + if any(keyword in error_name.lower() for keyword in ['file', 'ioerror']): # 使用更精確的匹配 + return ErrorType.FILE_IO + if any(keyword in error_message for keyword in ['file', 'directory', 'no such file']): + return ErrorType.FILE_IO + + # 進程相關錯誤 + if any(keyword in error_name.lower() for keyword in ['process', 'subprocess']): + return ErrorType.PROCESS + if any(keyword in error_message for keyword in ['process', 'command', 'executable']): + return ErrorType.PROCESS + + # 驗證錯誤 + if any(keyword in error_name.lower() for keyword in ['validation', 'value', 'type']): + return ErrorType.VALIDATION + + # 配置錯誤 + if any(keyword in error_message for keyword in ['config', 'setting', 'environment']): + return ErrorType.CONFIGURATION + + # 默認為系統錯誤 + return ErrorType.SYSTEM + + @staticmethod + def format_user_error( + error: Exception, + error_type: Optional[ErrorType] = None, + context: Optional[Dict[str, Any]] = None, + include_technical: bool = False + ) -> str: + """ + 將技術錯誤轉換為用戶友好的錯誤信息 + + Args: + error: Python 異常對象 + error_type: 錯誤類型(可選,會自動分類) + context: 錯誤上下文信息 + include_technical: 是否包含技術細節 + + Returns: + str: 用戶友好的錯誤信息 + """ + # 自動分類錯誤類型 + if error_type is None: + error_type = ErrorHandler.classify_error(error) + + # 獲取當前語言 + language = ErrorHandler.get_current_language() + + # 獲取用戶友好的錯誤信息(優先使用國際化系統) + user_message = ErrorHandler.get_i18n_error_message(error_type) + + # 構建完整的錯誤信息 + parts = [f"❌ {user_message}"] + + # 添加上下文信息 + if context: + if context.get("operation"): + if language == "en": + parts.append(f"Operation: {context['operation']}") + else: + parts.append(f"操作:{context['operation']}") + + if context.get("file_path"): + if language == "en": + parts.append(f"File: {context['file_path']}") + else: + parts.append(f"文件:{context['file_path']}") + + # 添加技術細節(如果需要) + if include_technical: + if language == "en": + parts.append(f"Technical details: {type(error).__name__}: {str(error)}") + else: + parts.append(f"技術細節:{type(error).__name__}: {str(error)}") + + return "\n".join(parts) + + @staticmethod + def get_error_solutions(error_type: ErrorType) -> List[str]: + """ + 獲取錯誤解決建議 + + Args: + error_type: 錯誤類型 + + Returns: + List[str]: 解決建議列表 + """ + return ErrorHandler.get_i18n_error_solutions(error_type) + + @staticmethod + def log_error_with_context( + error: Exception, + context: Optional[Dict[str, Any]] = None, + error_type: Optional[ErrorType] = None, + severity: ErrorSeverity = ErrorSeverity.MEDIUM + ) -> str: + """ + 記錄帶上下文的錯誤信息(不影響 JSON RPC) + + Args: + error: Python 異常對象 + context: 錯誤上下文信息 + error_type: 錯誤類型 + severity: 錯誤嚴重程度 + + Returns: + str: 錯誤 ID,用於追蹤 + """ + # 生成錯誤 ID + error_id = f"ERR_{int(time.time())}_{id(error) % 10000}" + + # 自動分類錯誤 + if error_type is None: + error_type = ErrorHandler.classify_error(error) + + # 構建錯誤記錄 + error_record = { + "error_id": error_id, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "error_type": error_type.value, + "severity": severity.value, + "exception_type": type(error).__name__, + "exception_message": str(error), + "context": context or {}, + "traceback": traceback.format_exc() if severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL] else None + } + + # 記錄到調試日誌(不影響 JSON RPC) + debug_log(f"錯誤記錄 [{error_id}]: {error_type.value} - {str(error)}") + + if context: + debug_log(f"錯誤上下文 [{error_id}]: {context}") + + # 對於嚴重錯誤,記錄完整堆棧跟蹤 + if severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]: + debug_log(f"錯誤堆棧 [{error_id}]:\n{traceback.format_exc()}") + + return error_id + + @staticmethod + def create_error_response( + error: Exception, + context: Optional[Dict[str, Any]] = None, + error_type: Optional[ErrorType] = None, + include_solutions: bool = True, + for_user: bool = True + ) -> Dict[str, Any]: + """ + 創建標準化的錯誤響應 + + Args: + error: Python 異常對象 + context: 錯誤上下文 + error_type: 錯誤類型 + include_solutions: 是否包含解決建議 + for_user: 是否為用戶界面使用 + + Returns: + Dict[str, Any]: 標準化錯誤響應 + """ + # 自動分類錯誤 + if error_type is None: + error_type = ErrorHandler.classify_error(error) + + # 記錄錯誤 + error_id = ErrorHandler.log_error_with_context(error, context, error_type) + + # 構建響應 + response = { + "success": False, + "error_id": error_id, + "error_type": error_type.value, + "message": ErrorHandler.format_user_error(error, error_type, context, include_technical=not for_user) + } + + # 添加解決建議 + if include_solutions: + solutions = ErrorHandler.get_error_solutions(error_type) + response["solutions"] = solutions # 即使為空列表也添加 + + # 添加上下文(僅用於調試) + if context and not for_user: + response["context"] = context + + return response diff --git a/src/mcp_feedback_enhanced/utils/resource_manager.py b/src/mcp_feedback_enhanced/utils/resource_manager.py new file mode 100644 index 0000000..205c180 --- /dev/null +++ b/src/mcp_feedback_enhanced/utils/resource_manager.py @@ -0,0 +1,719 @@ +""" +統一資源管理器 +============== + +提供統一的資源管理功能,包括: +- 臨時文件和目錄管理 +- 進程生命週期追蹤 +- 自動資源清理 +- 資源使用監控 +""" + +import os +import sys +import time +import atexit +import shutil +import tempfile +import threading +import subprocess +import weakref +from pathlib import Path +from typing import Set, Dict, Any, Optional, List, Union +from ..debug import debug_log +from .error_handler import ErrorHandler, ErrorType + + +class ResourceType: + """資源類型常量""" + TEMP_FILE = "temp_file" + TEMP_DIR = "temp_dir" + PROCESS = "process" + FILE_HANDLE = "file_handle" + + +class ResourceManager: + """統一資源管理器 - 提供完整的資源生命週期管理""" + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + """單例模式實現""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """初始化資源管理器""" + if hasattr(self, '_initialized'): + return + + self._initialized = True + + # 資源追蹤集合 + self.temp_files: Set[str] = set() + self.temp_dirs: Set[str] = set() + self.processes: Dict[int, Dict[str, Any]] = {} + self.file_handles: Set[Any] = set() + + # 資源統計 + self.stats = { + "temp_files_created": 0, + "temp_dirs_created": 0, + "processes_registered": 0, + "cleanup_runs": 0, + "last_cleanup": None + } + + # 配置 + self.auto_cleanup_enabled = True + self.cleanup_interval = 300 # 5分鐘 + self.temp_file_max_age = 3600 # 1小時 + + # 清理線程 + self._cleanup_thread: Optional[threading.Thread] = None + self._stop_cleanup = threading.Event() + + # 註冊退出清理 + atexit.register(self.cleanup_all) + + # 啟動自動清理 + self._start_auto_cleanup() + + debug_log("ResourceManager 初始化完成") + + def create_temp_file( + self, + suffix: str = "", + prefix: str = "mcp_", + dir: Optional[str] = None, + text: bool = True + ) -> str: + """ + 創建臨時文件並追蹤 + + Args: + suffix: 文件後綴 + prefix: 文件前綴 + dir: 臨時目錄,None 使用系統默認 + text: 是否為文本模式 + + Returns: + str: 臨時文件路徑 + """ + try: + # 創建臨時文件 + fd, temp_path = tempfile.mkstemp( + suffix=suffix, + prefix=prefix, + dir=dir, + text=text + ) + os.close(fd) # 關閉文件描述符 + + # 追蹤文件 + self.temp_files.add(temp_path) + self.stats["temp_files_created"] += 1 + + debug_log(f"創建臨時文件: {temp_path}") + return temp_path + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "創建臨時文件", "suffix": suffix, "prefix": prefix}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"創建臨時文件失敗 [錯誤ID: {error_id}]: {e}") + raise + + def create_temp_dir( + self, + suffix: str = "", + prefix: str = "mcp_", + dir: Optional[str] = None + ) -> str: + """ + 創建臨時目錄並追蹤 + + Args: + suffix: 目錄後綴 + prefix: 目錄前綴 + dir: 父目錄,None 使用系統默認 + + Returns: + str: 臨時目錄路徑 + """ + try: + # 創建臨時目錄 + temp_dir = tempfile.mkdtemp( + suffix=suffix, + prefix=prefix, + dir=dir + ) + + # 追蹤目錄 + self.temp_dirs.add(temp_dir) + self.stats["temp_dirs_created"] += 1 + + debug_log(f"創建臨時目錄: {temp_dir}") + return temp_dir + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "創建臨時目錄", "suffix": suffix, "prefix": prefix}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"創建臨時目錄失敗 [錯誤ID: {error_id}]: {e}") + raise + + def register_process( + self, + process: Union[subprocess.Popen, int], + description: str = "", + auto_cleanup: bool = True + ) -> int: + """ + 註冊進程追蹤 + + Args: + process: 進程對象或 PID + description: 進程描述 + auto_cleanup: 是否自動清理 + + Returns: + int: 進程 PID + """ + try: + if isinstance(process, subprocess.Popen): + pid = process.pid + process_obj = process + else: + pid = process + process_obj = None + + # 註冊進程 + self.processes[pid] = { + "process": process_obj, + "description": description, + "auto_cleanup": auto_cleanup, + "registered_at": time.time(), + "last_check": time.time() + } + + self.stats["processes_registered"] += 1 + + debug_log(f"註冊進程追蹤: PID {pid} - {description}") + return pid + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "註冊進程", "description": description}, + error_type=ErrorType.PROCESS + ) + debug_log(f"註冊進程失敗 [錯誤ID: {error_id}]: {e}") + raise + + def register_file_handle(self, file_handle: Any) -> None: + """ + 註冊文件句柄追蹤 + + Args: + file_handle: 文件句柄對象 + """ + try: + # 使用弱引用避免循環引用 + self.file_handles.add(weakref.ref(file_handle)) + debug_log(f"註冊文件句柄: {type(file_handle).__name__}") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "註冊文件句柄"}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"註冊文件句柄失敗 [錯誤ID: {error_id}]: {e}") + + def unregister_temp_file(self, file_path: str) -> bool: + """ + 取消臨時文件追蹤 + + Args: + file_path: 文件路徑 + + Returns: + bool: 是否成功取消追蹤 + """ + try: + if file_path in self.temp_files: + self.temp_files.remove(file_path) + debug_log(f"取消臨時文件追蹤: {file_path}") + return True + return False + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "取消文件追蹤", "file_path": file_path}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"取消文件追蹤失敗 [錯誤ID: {error_id}]: {e}") + return False + + def unregister_process(self, pid: int) -> bool: + """ + 取消進程追蹤 + + Args: + pid: 進程 PID + + Returns: + bool: 是否成功取消追蹤 + """ + try: + if pid in self.processes: + del self.processes[pid] + debug_log(f"取消進程追蹤: PID {pid}") + return True + return False + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "取消進程追蹤", "pid": pid}, + error_type=ErrorType.PROCESS + ) + debug_log(f"取消進程追蹤失敗 [錯誤ID: {error_id}]: {e}") + return False + + def cleanup_temp_files(self, max_age: Optional[int] = None) -> int: + """ + 清理臨時文件 + + Args: + max_age: 最大文件年齡(秒),None 使用默認值 + + Returns: + int: 清理的文件數量 + """ + if max_age is None: + max_age = self.temp_file_max_age + + cleaned_count = 0 + current_time = time.time() + files_to_remove = set() + + for file_path in self.temp_files.copy(): + try: + if not os.path.exists(file_path): + files_to_remove.add(file_path) + continue + + # 檢查文件年齡 + file_age = current_time - os.path.getmtime(file_path) + if file_age > max_age: + os.remove(file_path) + files_to_remove.add(file_path) + cleaned_count += 1 + debug_log(f"清理過期臨時文件: {file_path}") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "清理臨時文件", "file_path": file_path}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"清理臨時文件失敗 [錯誤ID: {error_id}]: {e}") + files_to_remove.add(file_path) # 移除無效追蹤 + + # 移除已清理的文件追蹤 + self.temp_files -= files_to_remove + + return cleaned_count + + def cleanup_temp_dirs(self) -> int: + """ + 清理臨時目錄 + + Returns: + int: 清理的目錄數量 + """ + cleaned_count = 0 + dirs_to_remove = set() + + for dir_path in self.temp_dirs.copy(): + try: + if not os.path.exists(dir_path): + dirs_to_remove.add(dir_path) + continue + + # 嘗試刪除目錄 + shutil.rmtree(dir_path) + dirs_to_remove.add(dir_path) + cleaned_count += 1 + debug_log(f"清理臨時目錄: {dir_path}") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "清理臨時目錄", "dir_path": dir_path}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"清理臨時目錄失敗 [錯誤ID: {error_id}]: {e}") + dirs_to_remove.add(dir_path) # 移除無效追蹤 + + # 移除已清理的目錄追蹤 + self.temp_dirs -= dirs_to_remove + + return cleaned_count + + def cleanup_processes(self, force: bool = False) -> int: + """ + 清理進程 + + Args: + force: 是否強制終止進程 + + Returns: + int: 清理的進程數量 + """ + cleaned_count = 0 + processes_to_remove = [] + + for pid, process_info in self.processes.copy().items(): + try: + process_obj = process_info.get("process") + auto_cleanup = process_info.get("auto_cleanup", True) + + if not auto_cleanup: + continue + + # 檢查進程是否還在運行 + if process_obj and hasattr(process_obj, 'poll'): + if process_obj.poll() is None: # 進程還在運行 + if force: + debug_log(f"強制終止進程: PID {pid}") + process_obj.kill() + else: + debug_log(f"優雅終止進程: PID {pid}") + process_obj.terminate() + + # 等待進程結束 + try: + process_obj.wait(timeout=5) + cleaned_count += 1 + except subprocess.TimeoutExpired: + if not force: + debug_log(f"進程 {pid} 優雅終止超時,強制終止") + process_obj.kill() + process_obj.wait(timeout=3) + cleaned_count += 1 + + processes_to_remove.append(pid) + else: + # 使用 psutil 檢查進程 + try: + import psutil + if psutil.pid_exists(pid): + proc = psutil.Process(pid) + if force: + proc.kill() + else: + proc.terminate() + proc.wait(timeout=5) + cleaned_count += 1 + processes_to_remove.append(pid) + except ImportError: + debug_log("psutil 不可用,跳過進程檢查") + processes_to_remove.append(pid) + except Exception as e: + debug_log(f"清理進程 {pid} 失敗: {e}") + processes_to_remove.append(pid) + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "清理進程", "pid": pid}, + error_type=ErrorType.PROCESS + ) + debug_log(f"清理進程失敗 [錯誤ID: {error_id}]: {e}") + processes_to_remove.append(pid) + + # 移除已清理的進程追蹤 + for pid in processes_to_remove: + self.processes.pop(pid, None) + + return cleaned_count + + def cleanup_file_handles(self) -> int: + """ + 清理文件句柄 + + Returns: + int: 清理的句柄數量 + """ + cleaned_count = 0 + handles_to_remove = set() + + for handle_ref in self.file_handles.copy(): + try: + handle = handle_ref() + if handle is None: + # 弱引用已失效 + handles_to_remove.add(handle_ref) + continue + + # 嘗試關閉文件句柄 + if hasattr(handle, 'close') and not handle.closed: + handle.close() + cleaned_count += 1 + debug_log(f"關閉文件句柄: {type(handle).__name__}") + + handles_to_remove.add(handle_ref) + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "清理文件句柄"}, + error_type=ErrorType.FILE_IO + ) + debug_log(f"清理文件句柄失敗 [錯誤ID: {error_id}]: {e}") + handles_to_remove.add(handle_ref) + + # 移除已清理的句柄追蹤 + self.file_handles -= handles_to_remove + + return cleaned_count + + def cleanup_all(self, force: bool = False) -> Dict[str, int]: + """ + 清理所有資源 + + Args: + force: 是否強制清理 + + Returns: + Dict[str, int]: 清理統計 + """ + debug_log("開始全面資源清理...") + + results = { + "temp_files": 0, + "temp_dirs": 0, + "processes": 0, + "file_handles": 0 + } + + try: + # 清理文件句柄 + results["file_handles"] = self.cleanup_file_handles() + + # 清理進程 + results["processes"] = self.cleanup_processes(force=force) + + # 清理臨時文件 + results["temp_files"] = self.cleanup_temp_files(max_age=0) # 清理所有文件 + + # 清理臨時目錄 + results["temp_dirs"] = self.cleanup_temp_dirs() + + # 更新統計 + self.stats["cleanup_runs"] += 1 + self.stats["last_cleanup"] = time.time() + + total_cleaned = sum(results.values()) + debug_log(f"資源清理完成,共清理 {total_cleaned} 個資源: {results}") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "全面資源清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"全面資源清理失敗 [錯誤ID: {error_id}]: {e}") + + return results + + def _start_auto_cleanup(self) -> None: + """啟動自動清理線程""" + if not self.auto_cleanup_enabled or self._cleanup_thread: + return + + def cleanup_worker(): + """清理工作線程""" + while not self._stop_cleanup.wait(self.cleanup_interval): + try: + # 執行定期清理 + self.cleanup_temp_files() + self._check_process_health() + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "自動清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"自動清理失敗 [錯誤ID: {error_id}]: {e}") + + self._cleanup_thread = threading.Thread( + target=cleanup_worker, + name="ResourceManager-AutoCleanup", + daemon=True + ) + self._cleanup_thread.start() + debug_log("自動清理線程已啟動") + + def _check_process_health(self) -> None: + """檢查進程健康狀態""" + current_time = time.time() + + for pid, process_info in self.processes.items(): + try: + process_obj = process_info.get("process") + last_check = process_info.get("last_check", current_time) + + # 每分鐘檢查一次 + if current_time - last_check < 60: + continue + + # 更新檢查時間 + process_info["last_check"] = current_time + + # 檢查進程是否還在運行 + if process_obj and hasattr(process_obj, 'poll'): + if process_obj.poll() is not None: + # 進程已結束,移除追蹤 + debug_log(f"檢測到進程 {pid} 已結束,移除追蹤") + self.unregister_process(pid) + + except Exception as e: + debug_log(f"檢查進程 {pid} 健康狀態失敗: {e}") + + def stop_auto_cleanup(self) -> None: + """停止自動清理""" + if self._cleanup_thread: + self._stop_cleanup.set() + self._cleanup_thread.join(timeout=5) + self._cleanup_thread = None + debug_log("自動清理線程已停止") + + def get_resource_stats(self) -> Dict[str, Any]: + """ + 獲取資源統計信息 + + Returns: + Dict[str, Any]: 資源統計 + """ + current_stats = self.stats.copy() + current_stats.update({ + "current_temp_files": len(self.temp_files), + "current_temp_dirs": len(self.temp_dirs), + "current_processes": len(self.processes), + "current_file_handles": len(self.file_handles), + "auto_cleanup_enabled": self.auto_cleanup_enabled, + "cleanup_interval": self.cleanup_interval, + "temp_file_max_age": self.temp_file_max_age + }) + + return current_stats + + def get_detailed_info(self) -> Dict[str, Any]: + """ + 獲取詳細資源信息 + + Returns: + Dict[str, Any]: 詳細資源信息 + """ + return { + "temp_files": list(self.temp_files), + "temp_dirs": list(self.temp_dirs), + "processes": { + pid: { + "description": info.get("description", ""), + "auto_cleanup": info.get("auto_cleanup", True), + "registered_at": info.get("registered_at", 0), + "last_check": info.get("last_check", 0) + } + for pid, info in self.processes.items() + }, + "file_handles_count": len(self.file_handles), + "stats": self.get_resource_stats() + } + + def configure( + self, + auto_cleanup_enabled: Optional[bool] = None, + cleanup_interval: Optional[int] = None, + temp_file_max_age: Optional[int] = None + ) -> None: + """ + 配置資源管理器 + + Args: + auto_cleanup_enabled: 是否啟用自動清理 + cleanup_interval: 清理間隔(秒) + temp_file_max_age: 臨時文件最大年齡(秒) + """ + if auto_cleanup_enabled is not None: + old_enabled = self.auto_cleanup_enabled + self.auto_cleanup_enabled = auto_cleanup_enabled + + if old_enabled and not auto_cleanup_enabled: + self.stop_auto_cleanup() + elif not old_enabled and auto_cleanup_enabled: + self._start_auto_cleanup() + elif auto_cleanup_enabled and self._cleanup_thread is None: + # 如果啟用了自動清理但線程不存在,重新啟動 + self._start_auto_cleanup() + + if cleanup_interval is not None: + self.cleanup_interval = max(60, cleanup_interval) # 最小1分鐘 + + if temp_file_max_age is not None: + self.temp_file_max_age = max(300, temp_file_max_age) # 最小5分鐘 + + debug_log(f"ResourceManager 配置已更新: auto_cleanup={self.auto_cleanup_enabled}, " + f"interval={self.cleanup_interval}, max_age={self.temp_file_max_age}") + + +# 全局資源管理器實例 +_resource_manager = None + + +def get_resource_manager() -> ResourceManager: + """ + 獲取全局資源管理器實例 + + Returns: + ResourceManager: 資源管理器實例 + """ + global _resource_manager + if _resource_manager is None: + _resource_manager = ResourceManager() + return _resource_manager + + +# 便捷函數 +def create_temp_file(suffix: str = "", prefix: str = "mcp_", **kwargs) -> str: + """創建臨時文件的便捷函數""" + return get_resource_manager().create_temp_file(suffix=suffix, prefix=prefix, **kwargs) + + +def create_temp_dir(suffix: str = "", prefix: str = "mcp_", **kwargs) -> str: + """創建臨時目錄的便捷函數""" + return get_resource_manager().create_temp_dir(suffix=suffix, prefix=prefix, **kwargs) + + +def register_process(process: Union[subprocess.Popen, int], description: str = "", **kwargs) -> int: + """註冊進程的便捷函數""" + return get_resource_manager().register_process(process, description=description, **kwargs) + + +def cleanup_all_resources(force: bool = False) -> Dict[str, int]: + """清理所有資源的便捷函數""" + return get_resource_manager().cleanup_all(force=force) diff --git a/src/mcp_feedback_enhanced/web/main.py b/src/mcp_feedback_enhanced/web/main.py index 1e940bd..a837aed 100644 --- a/src/mcp_feedback_enhanced/web/main.py +++ b/src/mcp_feedback_enhanced/web/main.py @@ -20,14 +20,18 @@ from pathlib import Path from typing import Dict, Optional import uuid -from fastapi import FastAPI +from fastapi import FastAPI, Request, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates +from fastapi.middleware.gzip import GZipMiddleware import uvicorn from .models import WebFeedbackSession, FeedbackResult from .routes import setup_routes from .utils import find_free_port, get_browser_opener +from .utils.port_manager import PortManager +from .utils.compression_config import get_compression_manager +from ..utils.error_handler import ErrorHandler, ErrorType from ..debug import web_debug_log as debug_log from ..i18n import get_i18n_manager @@ -56,10 +60,17 @@ class WebUIManager: else: debug_log(f"未設定 MCP_WEB_PORT 環境變數,使用預設端口 {preferred_port}") - # 優先使用指定端口,確保 localStorage 的一致性 - self.port = port or find_free_port(preferred_port=preferred_port) + # 使用增強的端口管理,支持自動清理 + self.port = port or PortManager.find_free_port_enhanced( + preferred_port=preferred_port, + auto_cleanup=True, + host=self.host + ) self.app = FastAPI(title="MCP Feedback Enhanced") + # 設置壓縮和緩存中間件 + self._setup_compression_middleware() + # 重構:使用單一活躍會話而非會話字典 self.current_session: Optional[WebFeedbackSession] = None self.sessions: Dict[str, WebFeedbackSession] = {} # 保留用於向後兼容 @@ -83,6 +94,48 @@ class WebUIManager: debug_log(f"WebUIManager 初始化完成,將在 {self.host}:{self.port} 啟動") + def _setup_compression_middleware(self): + """設置壓縮和緩存中間件""" + # 獲取壓縮管理器 + compression_manager = get_compression_manager() + config = compression_manager.config + + # 添加 Gzip 壓縮中間件 + self.app.add_middleware( + GZipMiddleware, + minimum_size=config.minimum_size + ) + + # 添加緩存和壓縮統計中間件 + @self.app.middleware("http") + async def compression_and_cache_middleware(request: Request, call_next): + """壓縮和緩存中間件""" + response = await call_next(request) + + # 添加緩存頭 + if not config.should_exclude_path(request.url.path): + cache_headers = config.get_cache_headers(request.url.path) + for key, value in cache_headers.items(): + response.headers[key] = value + + # 更新壓縮統計(如果可能) + try: + content_length = int(response.headers.get('content-length', 0)) + content_encoding = response.headers.get('content-encoding', '') + was_compressed = 'gzip' in content_encoding + + if content_length > 0: + # 估算原始大小(如果已壓縮,假設壓縮比為 30%) + original_size = content_length if not was_compressed else int(content_length / 0.7) + compression_manager.update_stats(original_size, content_length, was_compressed) + except (ValueError, TypeError): + # 忽略統計錯誤,不影響正常響應 + pass + + return response + + debug_log("壓縮和緩存中間件設置完成") + def _setup_static_files(self): """設置靜態文件服務""" # Web UI 靜態文件 @@ -262,16 +315,44 @@ class WebUIManager: if e.errno == 10048: # Windows: 位址已在使用中 retry_count += 1 if retry_count < max_retries: - debug_log(f"端口 {self.port} 被占用,嘗試下一個端口") - self.port = find_free_port(self.port + 1) + debug_log(f"端口 {self.port} 被占用,使用增強端口管理查找新端口") + # 使用增強的端口管理查找新端口 + try: + self.port = PortManager.find_free_port_enhanced( + preferred_port=self.port + 1, + auto_cleanup=False, # 啟動時不自動清理,避免誤殺其他服務 + host=self.host + ) + debug_log(f"找到新的可用端口: {self.port}") + except RuntimeError as port_error: + # 使用統一錯誤處理 + error_id = ErrorHandler.log_error_with_context( + port_error, + context={"operation": "端口查找", "current_port": self.port}, + error_type=ErrorType.NETWORK + ) + debug_log(f"無法找到可用端口 [錯誤ID: {error_id}]: {port_error}") + break else: debug_log("已達到最大重試次數,無法啟動伺服器") break else: - debug_log(f"伺服器啟動錯誤: {e}") + # 使用統一錯誤處理 + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "伺服器啟動", "host": self.host, "port": self.port}, + error_type=ErrorType.NETWORK + ) + debug_log(f"伺服器啟動錯誤 [錯誤ID: {error_id}]: {e}") break except Exception as e: - debug_log(f"伺服器運行錯誤: {e}") + # 使用統一錯誤處理 + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "伺服器運行", "host": self.host, "port": self.port}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"伺服器運行錯誤 [錯誤ID: {error_id}]: {e}") break # 在新線程中啟動伺服器 diff --git a/src/mcp_feedback_enhanced/web/models/feedback_session.py b/src/mcp_feedback_enhanced/web/models/feedback_session.py index 746c2f2..6089f63 100644 --- a/src/mcp_feedback_enhanced/web/models/feedback_session.py +++ b/src/mcp_feedback_enhanced/web/models/feedback_session.py @@ -18,6 +18,7 @@ from typing import Dict, List, Optional from fastapi import WebSocket from ...debug import web_debug_log as debug_log +from ...utils.resource_manager import get_resource_manager, register_process class SessionStatus(Enum): @@ -60,6 +61,9 @@ class WebFeedbackSession: # 確保臨時目錄存在 TEMP_DIR.mkdir(parents=True, exist_ok=True) + # 獲取資源管理器實例 + self.resource_manager = get_resource_manager() + def update_status(self, status: SessionStatus, message: str = None): """更新會話狀態""" self.status = status @@ -249,6 +253,13 @@ class WebFeedbackSession: universal_newlines=True ) + # 註冊進程到資源管理器 + register_process( + self.process, + description=f"WebFeedbackSession-{self.session_id}-command", + auto_cleanup=True + ) + # 在背景線程中讀取輸出 async def read_output(): loop = asyncio.get_event_loop() @@ -281,7 +292,10 @@ class WebFeedbackSession: # 等待進程完成 if self.process: exit_code = self.process.wait() - + + # 從資源管理器取消註冊進程 + self.resource_manager.unregister_process(self.process.pid) + # 發送命令完成信號 if self.websocket: try: diff --git a/src/mcp_feedback_enhanced/web/utils/compression_config.py b/src/mcp_feedback_enhanced/web/utils/compression_config.py new file mode 100644 index 0000000..5522c33 --- /dev/null +++ b/src/mcp_feedback_enhanced/web/utils/compression_config.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +壓縮配置管理器 +============== + +管理 Web UI 的 Gzip 壓縮配置和靜態文件緩存策略。 +支援可配置的壓縮參數和性能優化選項。 +""" + +import os +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class CompressionConfig: + """壓縮配置類""" + + # Gzip 壓縮設定 + minimum_size: int = 1000 # 最小壓縮大小(bytes) + compression_level: int = 6 # 壓縮級別 (1-9, 6為平衡點) + + # 緩存設定 + static_cache_max_age: int = 3600 # 靜態文件緩存時間(秒) + api_cache_max_age: int = 0 # API 響應緩存時間(秒,0表示不緩存) + + # 支援的 MIME 類型 + compressible_types: List[str] = None + + # 排除的路徑 + exclude_paths: List[str] = None + + def __post_init__(self): + """初始化後處理""" + if self.compressible_types is None: + self.compressible_types = [ + 'text/html', + 'text/css', + 'text/javascript', + 'text/plain', + 'application/json', + 'application/javascript', + 'application/xml', + 'application/rss+xml', + 'application/atom+xml', + 'image/svg+xml' + ] + + if self.exclude_paths is None: + self.exclude_paths = [ + '/ws', # WebSocket 連接 + '/api/ws', # WebSocket API + '/health', # 健康檢查 + ] + + @classmethod + def from_env(cls) -> 'CompressionConfig': + """從環境變數創建配置""" + return cls( + minimum_size=int(os.getenv('MCP_GZIP_MIN_SIZE', '1000')), + compression_level=int(os.getenv('MCP_GZIP_LEVEL', '6')), + static_cache_max_age=int(os.getenv('MCP_STATIC_CACHE_AGE', '3600')), + api_cache_max_age=int(os.getenv('MCP_API_CACHE_AGE', '0')) + ) + + def should_compress(self, content_type: str, content_length: int) -> bool: + """判斷是否應該壓縮""" + if content_length < self.minimum_size: + return False + + if not content_type: + return False + + # 檢查 MIME 類型 + for mime_type in self.compressible_types: + if content_type.startswith(mime_type): + return True + + return False + + def should_exclude_path(self, path: str) -> bool: + """判斷路徑是否應該排除壓縮""" + for exclude_path in self.exclude_paths: + if path.startswith(exclude_path): + return True + return False + + def get_cache_headers(self, path: str) -> Dict[str, str]: + """獲取緩存頭""" + headers = {} + + if path.startswith('/static/'): + # 靜態文件緩存 + headers['Cache-Control'] = f'public, max-age={self.static_cache_max_age}' + headers['Expires'] = self._get_expires_header(self.static_cache_max_age) + elif path.startswith('/api/') and self.api_cache_max_age > 0: + # API 緩存(如果啟用) + headers['Cache-Control'] = f'public, max-age={self.api_cache_max_age}' + headers['Expires'] = self._get_expires_header(self.api_cache_max_age) + else: + # 其他路徑不緩存 + headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' + headers['Pragma'] = 'no-cache' + headers['Expires'] = '0' + + return headers + + def _get_expires_header(self, max_age: int) -> str: + """生成 Expires 頭""" + from datetime import datetime, timedelta + expires_time = datetime.utcnow() + timedelta(seconds=max_age) + return expires_time.strftime('%a, %d %b %Y %H:%M:%S GMT') + + def get_compression_stats(self) -> Dict[str, any]: + """獲取壓縮配置統計""" + return { + 'minimum_size': self.minimum_size, + 'compression_level': self.compression_level, + 'static_cache_max_age': self.static_cache_max_age, + 'compressible_types_count': len(self.compressible_types), + 'exclude_paths_count': len(self.exclude_paths), + 'compressible_types': self.compressible_types, + 'exclude_paths': self.exclude_paths + } + + +class CompressionManager: + """壓縮管理器""" + + def __init__(self, config: Optional[CompressionConfig] = None): + self.config = config or CompressionConfig.from_env() + self._stats = { + 'requests_total': 0, + 'requests_compressed': 0, + 'bytes_original': 0, + 'bytes_compressed': 0, + 'compression_ratio': 0.0 + } + + def update_stats(self, original_size: int, compressed_size: int, was_compressed: bool): + """更新壓縮統計""" + self._stats['requests_total'] += 1 + self._stats['bytes_original'] += original_size + + if was_compressed: + self._stats['requests_compressed'] += 1 + self._stats['bytes_compressed'] += compressed_size + else: + self._stats['bytes_compressed'] += original_size + + # 計算壓縮比率 + if self._stats['bytes_original'] > 0: + self._stats['compression_ratio'] = ( + 1 - self._stats['bytes_compressed'] / self._stats['bytes_original'] + ) * 100 + + def get_stats(self) -> Dict[str, any]: + """獲取壓縮統計""" + stats = self._stats.copy() + stats['compression_percentage'] = ( + self._stats['requests_compressed'] / max(self._stats['requests_total'], 1) * 100 + ) + return stats + + def reset_stats(self): + """重置統計""" + self._stats = { + 'requests_total': 0, + 'requests_compressed': 0, + 'bytes_original': 0, + 'bytes_compressed': 0, + 'compression_ratio': 0.0 + } + + +# 全域壓縮管理器實例 +_compression_manager: Optional[CompressionManager] = None + + +def get_compression_manager() -> CompressionManager: + """獲取全域壓縮管理器實例""" + global _compression_manager + if _compression_manager is None: + _compression_manager = CompressionManager() + return _compression_manager diff --git a/src/mcp_feedback_enhanced/web/utils/compression_monitor.py b/src/mcp_feedback_enhanced/web/utils/compression_monitor.py new file mode 100644 index 0000000..475bada --- /dev/null +++ b/src/mcp_feedback_enhanced/web/utils/compression_monitor.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +壓縮性能監控工具 +================ + +監控 Gzip 壓縮的性能效果,包括壓縮比率、響應時間和文件大小統計。 +提供實時性能數據和優化建議。 +""" + +import time +import threading +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import json + + +@dataclass +class CompressionMetrics: + """壓縮指標數據類""" + timestamp: datetime + path: str + original_size: int + compressed_size: int + compression_ratio: float + response_time: float + content_type: str + was_compressed: bool + + +@dataclass +class CompressionSummary: + """壓縮摘要統計""" + total_requests: int = 0 + compressed_requests: int = 0 + total_original_bytes: int = 0 + total_compressed_bytes: int = 0 + average_compression_ratio: float = 0.0 + average_response_time: float = 0.0 + compression_percentage: float = 0.0 + bandwidth_saved: int = 0 + top_compressed_paths: List[Tuple[str, float]] = field(default_factory=list) + + +class CompressionMonitor: + """壓縮性能監控器""" + + def __init__(self, max_metrics: int = 1000): + self.max_metrics = max_metrics + self.metrics: List[CompressionMetrics] = [] + self.lock = threading.Lock() + self._start_time = datetime.now() + + # 路徑統計 + self.path_stats: Dict[str, Dict] = {} + + # 內容類型統計 + self.content_type_stats: Dict[str, Dict] = {} + + def record_request(self, + path: str, + original_size: int, + compressed_size: int, + response_time: float, + content_type: str = "", + was_compressed: bool = False): + """記錄請求的壓縮數據""" + + compression_ratio = 0.0 + if original_size > 0 and was_compressed: + compression_ratio = (1 - compressed_size / original_size) * 100 + + metric = CompressionMetrics( + timestamp=datetime.now(), + path=path, + original_size=original_size, + compressed_size=compressed_size, + compression_ratio=compression_ratio, + response_time=response_time, + content_type=content_type, + was_compressed=was_compressed + ) + + with self.lock: + self.metrics.append(metric) + + # 限制記錄數量 + if len(self.metrics) > self.max_metrics: + self.metrics = self.metrics[-self.max_metrics:] + + # 更新路徑統計 + self._update_path_stats(metric) + + # 更新內容類型統計 + self._update_content_type_stats(metric) + + def _update_path_stats(self, metric: CompressionMetrics): + """更新路徑統計""" + path = metric.path + if path not in self.path_stats: + self.path_stats[path] = { + 'requests': 0, + 'compressed_requests': 0, + 'total_original_bytes': 0, + 'total_compressed_bytes': 0, + 'total_response_time': 0.0, + 'best_compression_ratio': 0.0 + } + + stats = self.path_stats[path] + stats['requests'] += 1 + stats['total_original_bytes'] += metric.original_size + stats['total_compressed_bytes'] += metric.compressed_size + stats['total_response_time'] += metric.response_time + + if metric.was_compressed: + stats['compressed_requests'] += 1 + stats['best_compression_ratio'] = max( + stats['best_compression_ratio'], + metric.compression_ratio + ) + + def _update_content_type_stats(self, metric: CompressionMetrics): + """更新內容類型統計""" + content_type = metric.content_type or 'unknown' + if content_type not in self.content_type_stats: + self.content_type_stats[content_type] = { + 'requests': 0, + 'compressed_requests': 0, + 'total_original_bytes': 0, + 'total_compressed_bytes': 0, + 'average_compression_ratio': 0.0 + } + + stats = self.content_type_stats[content_type] + stats['requests'] += 1 + stats['total_original_bytes'] += metric.original_size + stats['total_compressed_bytes'] += metric.compressed_size + + if metric.was_compressed: + stats['compressed_requests'] += 1 + + # 重新計算平均壓縮比 + if stats['total_original_bytes'] > 0: + stats['average_compression_ratio'] = ( + 1 - stats['total_compressed_bytes'] / stats['total_original_bytes'] + ) * 100 + + def get_summary(self, time_window: Optional[timedelta] = None) -> CompressionSummary: + """獲取壓縮摘要統計""" + with self.lock: + metrics = self.metrics + + # 如果指定時間窗口,過濾數據 + if time_window: + cutoff_time = datetime.now() - time_window + metrics = [m for m in metrics if m.timestamp >= cutoff_time] + + if not metrics: + return CompressionSummary() + + total_requests = len(metrics) + compressed_requests = sum(1 for m in metrics if m.was_compressed) + total_original_bytes = sum(m.original_size for m in metrics) + total_compressed_bytes = sum(m.compressed_size for m in metrics) + total_response_time = sum(m.response_time for m in metrics) + + # 計算統計數據 + compression_percentage = (compressed_requests / total_requests * 100) if total_requests > 0 else 0 + average_compression_ratio = 0.0 + bandwidth_saved = 0 + + if total_original_bytes > 0: + average_compression_ratio = (1 - total_compressed_bytes / total_original_bytes) * 100 + bandwidth_saved = total_original_bytes - total_compressed_bytes + + average_response_time = total_response_time / total_requests if total_requests > 0 else 0 + + # 獲取壓縮效果最好的路徑 + top_compressed_paths = self._get_top_compressed_paths() + + return CompressionSummary( + total_requests=total_requests, + compressed_requests=compressed_requests, + total_original_bytes=total_original_bytes, + total_compressed_bytes=total_compressed_bytes, + average_compression_ratio=average_compression_ratio, + average_response_time=average_response_time, + compression_percentage=compression_percentage, + bandwidth_saved=bandwidth_saved, + top_compressed_paths=top_compressed_paths + ) + + def _get_top_compressed_paths(self, limit: int = 5) -> List[Tuple[str, float]]: + """獲取壓縮效果最好的路徑""" + path_ratios = [] + + for path, stats in self.path_stats.items(): + if stats['compressed_requests'] > 0 and stats['total_original_bytes'] > 0: + compression_ratio = ( + 1 - stats['total_compressed_bytes'] / stats['total_original_bytes'] + ) * 100 + path_ratios.append((path, compression_ratio)) + + # 按壓縮比排序 + path_ratios.sort(key=lambda x: x[1], reverse=True) + return path_ratios[:limit] + + def get_path_stats(self) -> Dict[str, Dict]: + """獲取路徑統計""" + with self.lock: + return self.path_stats.copy() + + def get_content_type_stats(self) -> Dict[str, Dict]: + """獲取內容類型統計""" + with self.lock: + return self.content_type_stats.copy() + + def get_recent_metrics(self, limit: int = 100) -> List[CompressionMetrics]: + """獲取最近的指標數據""" + with self.lock: + return self.metrics[-limit:] if self.metrics else [] + + def reset_stats(self): + """重置統計數據""" + with self.lock: + self.metrics.clear() + self.path_stats.clear() + self.content_type_stats.clear() + self._start_time = datetime.now() + + def export_stats(self) -> Dict: + """導出統計數據為字典格式""" + summary = self.get_summary() + + return { + 'summary': { + 'total_requests': summary.total_requests, + 'compressed_requests': summary.compressed_requests, + 'compression_percentage': round(summary.compression_percentage, 2), + 'average_compression_ratio': round(summary.average_compression_ratio, 2), + 'bandwidth_saved_mb': round(summary.bandwidth_saved / (1024 * 1024), 2), + 'average_response_time_ms': round(summary.average_response_time * 1000, 2), + 'monitoring_duration_hours': round( + (datetime.now() - self._start_time).total_seconds() / 3600, 2 + ) + }, + 'top_compressed_paths': [ + {'path': path, 'compression_ratio': round(ratio, 2)} + for path, ratio in summary.top_compressed_paths + ], + 'path_stats': { + path: { + 'requests': stats['requests'], + 'compression_percentage': round( + stats['compressed_requests'] / stats['requests'] * 100, 2 + ) if stats['requests'] > 0 else 0, + 'average_response_time_ms': round( + stats['total_response_time'] / stats['requests'] * 1000, 2 + ) if stats['requests'] > 0 else 0, + 'bandwidth_saved_kb': round( + (stats['total_original_bytes'] - stats['total_compressed_bytes']) / 1024, 2 + ) + } + for path, stats in self.path_stats.items() + }, + 'content_type_stats': { + content_type: { + 'requests': stats['requests'], + 'compression_percentage': round( + stats['compressed_requests'] / stats['requests'] * 100, 2 + ) if stats['requests'] > 0 else 0, + 'average_compression_ratio': round(stats['average_compression_ratio'], 2) + } + for content_type, stats in self.content_type_stats.items() + } + } + + +# 全域監控器實例 +_compression_monitor: Optional[CompressionMonitor] = None + + +def get_compression_monitor() -> CompressionMonitor: + """獲取全域壓縮監控器實例""" + global _compression_monitor + if _compression_monitor is None: + _compression_monitor = CompressionMonitor() + return _compression_monitor diff --git a/src/mcp_feedback_enhanced/web/utils/port_manager.py b/src/mcp_feedback_enhanced/web/utils/port_manager.py new file mode 100644 index 0000000..635909d --- /dev/null +++ b/src/mcp_feedback_enhanced/web/utils/port_manager.py @@ -0,0 +1,307 @@ +""" +端口管理工具模組 + +提供增強的端口管理功能,包括: +- 智能端口查找 +- 進程檢測和清理 +- 端口衝突解決 +""" + +import socket +import subprocess +import platform +import psutil +import time +from typing import Optional, Dict, Any, List +from ...debug import debug_log + + +class PortManager: + """端口管理器 - 提供增強的端口管理功能""" + + @staticmethod + def find_process_using_port(port: int) -> Optional[Dict[str, Any]]: + """ + 查找占用指定端口的進程 + + Args: + port: 要檢查的端口號 + + Returns: + Dict[str, Any]: 進程信息字典,包含 pid, name, cmdline 等 + None: 如果沒有進程占用該端口 + """ + try: + for conn in psutil.net_connections(kind='inet'): + if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN: + try: + process = psutil.Process(conn.pid) + return { + 'pid': conn.pid, + 'name': process.name(), + 'cmdline': ' '.join(process.cmdline()), + 'create_time': process.create_time(), + 'status': process.status() + } + except (psutil.NoSuchProcess, psutil.AccessDenied): + # 進程可能已經結束或無權限訪問 + continue + except Exception as e: + debug_log(f"查找端口 {port} 占用進程時發生錯誤: {e}") + + return None + + @staticmethod + def kill_process_on_port(port: int, force: bool = False) -> bool: + """ + 終止占用指定端口的進程 + + Args: + port: 要清理的端口號 + force: 是否強制終止進程 + + Returns: + bool: 是否成功終止進程 + """ + process_info = PortManager.find_process_using_port(port) + if not process_info: + debug_log(f"端口 {port} 沒有被任何進程占用") + return True + + try: + pid = process_info['pid'] + process = psutil.Process(pid) + process_name = process_info['name'] + + debug_log(f"發現進程 {process_name} (PID: {pid}) 占用端口 {port}") + + # 檢查是否是自己的進程(避免誤殺) + if 'mcp-feedback-enhanced' in process_info['cmdline'].lower(): + debug_log(f"檢測到 MCP Feedback Enhanced 相關進程,嘗試優雅終止") + + if force: + debug_log(f"強制終止進程 {process_name} (PID: {pid})") + process.kill() + else: + debug_log(f"優雅終止進程 {process_name} (PID: {pid})") + process.terminate() + + # 等待進程結束 + try: + process.wait(timeout=5) + debug_log(f"成功終止進程 {process_name} (PID: {pid})") + return True + except psutil.TimeoutExpired: + if not force: + debug_log(f"優雅終止超時,強制終止進程 {process_name} (PID: {pid})") + process.kill() + process.wait(timeout=3) + return True + else: + debug_log(f"強制終止進程 {process_name} (PID: {pid}) 失敗") + return False + + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + debug_log(f"無法終止進程 (PID: {process_info['pid']}): {e}") + return False + except Exception as e: + debug_log(f"終止端口 {port} 占用進程時發生錯誤: {e}") + return False + + @staticmethod + def is_port_available(host: str, port: int) -> bool: + """ + 檢查端口是否可用 + + Args: + host: 主機地址 + port: 端口號 + + Returns: + bool: 端口是否可用 + """ + try: + # 首先嘗試不使用 SO_REUSEADDR 來檢測端口 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind((host, port)) + return True + except OSError: + # 如果綁定失敗,再檢查是否真的有進程在監聽 + # 使用 psutil 檢查是否有進程在監聽該端口 + try: + import psutil + for conn in psutil.net_connections(kind='inet'): + if (conn.laddr.port == port and + conn.laddr.ip in [host, '0.0.0.0', '::'] and + conn.status == psutil.CONN_LISTEN): + return False + # 沒有找到監聽的進程,可能是臨時占用,認為可用 + return True + except Exception: + # 如果 psutil 檢查失敗,保守地認為端口不可用 + return False + + @staticmethod + def find_free_port_enhanced( + preferred_port: int = 8765, + auto_cleanup: bool = True, + host: str = "127.0.0.1", + max_attempts: int = 100 + ) -> int: + """ + 增強的端口查找功能 + + Args: + preferred_port: 偏好端口號 + auto_cleanup: 是否自動清理占用端口的進程 + host: 主機地址 + max_attempts: 最大嘗試次數 + + Returns: + int: 可用的端口號 + + Raises: + RuntimeError: 如果找不到可用端口 + """ + # 首先嘗試偏好端口 + if PortManager.is_port_available(host, preferred_port): + debug_log(f"偏好端口 {preferred_port} 可用") + return preferred_port + + # 如果偏好端口被占用且啟用自動清理 + if auto_cleanup: + debug_log(f"偏好端口 {preferred_port} 被占用,嘗試清理占用進程") + process_info = PortManager.find_process_using_port(preferred_port) + + if process_info: + debug_log(f"端口 {preferred_port} 被進程 {process_info['name']} (PID: {process_info['pid']}) 占用") + + # 詢問用戶是否清理(在實際使用中可能需要配置選項) + if PortManager._should_cleanup_process(process_info): + if PortManager.kill_process_on_port(preferred_port): + # 等待一下讓端口釋放 + time.sleep(1) + if PortManager.is_port_available(host, preferred_port): + debug_log(f"成功清理端口 {preferred_port},現在可用") + return preferred_port + + # 如果偏好端口仍不可用,尋找其他端口 + debug_log(f"偏好端口 {preferred_port} 不可用,尋找其他可用端口") + + for i in range(max_attempts): + port = preferred_port + i + 1 + if PortManager.is_port_available(host, port): + debug_log(f"找到可用端口: {port}") + return port + + # 如果向上查找失敗,嘗試向下查找 + for i in range(1, min(preferred_port - 1024, max_attempts)): + port = preferred_port - i + if port < 1024: # 避免使用系統保留端口 + break + if PortManager.is_port_available(host, port): + debug_log(f"找到可用端口: {port}") + return port + + raise RuntimeError( + f"無法在 {preferred_port}±{max_attempts} 範圍內找到可用端口。" + f"請檢查是否有過多進程占用端口,或手動指定其他端口。" + ) + + @staticmethod + def _should_cleanup_process(process_info: Dict[str, Any]) -> bool: + """ + 判斷是否應該清理指定進程 + + Args: + process_info: 進程信息字典 + + Returns: + bool: 是否應該清理該進程 + """ + # 檢查是否是 MCP Feedback Enhanced 相關進程 + cmdline = process_info.get('cmdline', '').lower() + process_name = process_info.get('name', '').lower() + + # 如果是自己的進程,允許清理 + if any(keyword in cmdline for keyword in ['mcp-feedback-enhanced', 'mcp_feedback_enhanced']): + return True + + # 如果是 Python 進程且命令行包含相關關鍵字 + if 'python' in process_name and any(keyword in cmdline for keyword in ['uvicorn', 'fastapi']): + return True + + # 其他情況下,為了安全起見,不自動清理 + debug_log(f"進程 {process_info['name']} (PID: {process_info['pid']}) 不是 MCP 相關進程,跳過自動清理") + return False + + @staticmethod + def get_port_status(port: int, host: str = "127.0.0.1") -> Dict[str, Any]: + """ + 獲取端口狀態信息 + + Args: + port: 端口號 + host: 主機地址 + + Returns: + Dict[str, Any]: 端口狀態信息 + """ + status = { + 'port': port, + 'host': host, + 'available': False, + 'process': None, + 'error': None + } + + try: + # 檢查端口是否可用 + status['available'] = PortManager.is_port_available(host, port) + + # 如果不可用,查找占用進程 + if not status['available']: + status['process'] = PortManager.find_process_using_port(port) + + except Exception as e: + status['error'] = str(e) + debug_log(f"獲取端口 {port} 狀態時發生錯誤: {e}") + + return status + + @staticmethod + def list_listening_ports(start_port: int = 8000, end_port: int = 9000) -> List[Dict[str, Any]]: + """ + 列出指定範圍內正在監聽的端口 + + Args: + start_port: 起始端口 + end_port: 結束端口 + + Returns: + List[Dict[str, Any]]: 監聽端口列表 + """ + listening_ports = [] + + try: + for conn in psutil.net_connections(kind='inet'): + if (conn.status == psutil.CONN_LISTEN and + start_port <= conn.laddr.port <= end_port): + + try: + process = psutil.Process(conn.pid) + port_info = { + 'port': conn.laddr.port, + 'host': conn.laddr.ip, + 'pid': conn.pid, + 'process_name': process.name(), + 'cmdline': ' '.join(process.cmdline()) + } + listening_ports.append(port_info) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + except Exception as e: + debug_log(f"列出監聽端口時發生錯誤: {e}") + + return listening_ports diff --git a/tests/test_error_handler.py b/tests/test_error_handler.py new file mode 100644 index 0000000..3a973ac --- /dev/null +++ b/tests/test_error_handler.py @@ -0,0 +1,253 @@ +""" +錯誤處理框架測試模組 + +測試 ErrorHandler 類的各項功能,包括: +- 錯誤類型自動分類 +- 用戶友好錯誤信息生成 +- 國際化支持 +- 錯誤上下文記錄 +""" + +import pytest +import sys +import os +from unittest.mock import patch, MagicMock + +# 添加 src 目錄到 Python 路徑 +sys.path.insert(0, 'src') + +from mcp_feedback_enhanced.utils.error_handler import ErrorHandler, ErrorType, ErrorSeverity + + +class TestErrorHandler: + """錯誤處理器測試類""" + + def test_classify_error_network(self): + """測試網絡錯誤分類""" + # 測試 ConnectionError + error = ConnectionError("Connection failed") + assert ErrorHandler.classify_error(error) == ErrorType.NETWORK + + # 測試包含網絡關鍵字的錯誤(不包含 timeout) + error = Exception("socket connection failed") + assert ErrorHandler.classify_error(error) == ErrorType.NETWORK + + def test_classify_error_file_io(self): + """測試文件 I/O 錯誤分類""" + # 測試 FileNotFoundError + error = FileNotFoundError("No such file or directory") + assert ErrorHandler.classify_error(error) == ErrorType.FILE_IO + + # 測試包含文件關鍵字的錯誤(不包含權限關鍵字) + error = Exception("file not found") + assert ErrorHandler.classify_error(error) == ErrorType.FILE_IO + + def test_classify_error_timeout(self): + """測試超時錯誤分類""" + error = TimeoutError("Operation timed out") + assert ErrorHandler.classify_error(error) == ErrorType.TIMEOUT + + error = Exception("timeout occurred") + assert ErrorHandler.classify_error(error) == ErrorType.TIMEOUT + + def test_classify_error_permission(self): + """測試權限錯誤分類""" + error = PermissionError("Access denied") + assert ErrorHandler.classify_error(error) == ErrorType.PERMISSION + + error = Exception("access denied") + assert ErrorHandler.classify_error(error) == ErrorType.PERMISSION + + def test_classify_error_validation(self): + """測試驗證錯誤分類""" + error = ValueError("Invalid value") + assert ErrorHandler.classify_error(error) == ErrorType.VALIDATION + + error = TypeError("Wrong type") + assert ErrorHandler.classify_error(error) == ErrorType.VALIDATION + + def test_classify_error_default_system(self): + """測試默認系統錯誤分類""" + error = Exception("Some completely unknown issue") + assert ErrorHandler.classify_error(error) == ErrorType.SYSTEM + + def test_format_user_error_basic(self): + """測試基本用戶友好錯誤信息生成""" + error = ConnectionError("Connection failed") + result = ErrorHandler.format_user_error(error) + + assert "❌" in result + assert "網絡連接出現問題" in result or "网络连接出现问题" in result or "Network connection issue" in result + + def test_format_user_error_with_context(self): + """測試帶上下文的錯誤信息生成""" + error = FileNotFoundError("File not found") + context = { + "operation": "文件讀取", + "file_path": "/path/to/file.txt" + } + + result = ErrorHandler.format_user_error(error, context=context) + + assert "❌" in result + assert "文件讀取" in result or "文件读取" in result or "文件讀取" in result + assert "/path/to/file.txt" in result + + def test_format_user_error_with_technical_details(self): + """測試包含技術細節的錯誤信息""" + error = ValueError("Invalid input") + result = ErrorHandler.format_user_error(error, include_technical=True) + + assert "❌" in result + assert "ValueError" in result + assert "Invalid input" in result + + def test_get_error_solutions(self): + """測試獲取錯誤解決方案""" + solutions = ErrorHandler.get_error_solutions(ErrorType.NETWORK) + + assert isinstance(solutions, list) + assert len(solutions) > 0 + # 應該包含網絡相關的解決方案 + solutions_text = " ".join(solutions).lower() + assert any(keyword in solutions_text for keyword in ["網絡", "网络", "network", "連接", "连接", "connection"]) + + def test_log_error_with_context(self): + """測試帶上下文的錯誤記錄""" + error = Exception("Test error") + context = {"operation": "測試操作", "user": "test_user"} + + error_id = ErrorHandler.log_error_with_context(error, context=context) + + assert isinstance(error_id, str) + assert error_id.startswith("ERR_") + assert len(error_id.split("_")) == 3 # ERR_timestamp_id + + def test_create_error_response(self): + """測試創建標準化錯誤響應""" + error = ConnectionError("Network error") + context = {"operation": "網絡請求"} + + response = ErrorHandler.create_error_response(error, context=context) + + assert isinstance(response, dict) + assert response["success"] is False + assert "error_id" in response + assert "error_type" in response + assert "message" in response + assert response["error_type"] == ErrorType.NETWORK.value + assert "solutions" in response + + def test_create_error_response_for_user(self): + """測試為用戶界面創建錯誤響應""" + error = FileNotFoundError("File not found") + + response = ErrorHandler.create_error_response(error, for_user=True) + + assert response["success"] is False + assert "context" not in response # 用戶界面不應包含技術上下文 + assert "❌" in response["message"] # 應該包含用戶友好的格式 + + @patch('mcp_feedback_enhanced.utils.error_handler.ErrorHandler.get_i18n_error_message') + def test_language_support(self, mock_get_message): + """測試多語言支持""" + error = ConnectionError("Network error") + + # 測試繁體中文 + mock_get_message.return_value = "網絡連接出現問題" + result = ErrorHandler.format_user_error(error) + assert "網絡連接出現問題" in result + + # 測試簡體中文 + mock_get_message.return_value = "网络连接出现问题" + result = ErrorHandler.format_user_error(error) + assert "网络连接出现问题" in result + + # 測試英文 + mock_get_message.return_value = "Network connection issue" + result = ErrorHandler.format_user_error(error) + assert "Network connection issue" in result + + def test_error_severity_logging(self): + """測試錯誤嚴重程度記錄""" + error = Exception("Critical system error") + + # 測試高嚴重程度錯誤 + error_id = ErrorHandler.log_error_with_context( + error, + severity=ErrorSeverity.CRITICAL + ) + + assert isinstance(error_id, str) + assert error_id.startswith("ERR_") + + def test_get_current_language_fallback(self): + """測試語言獲取回退機制""" + # 由於 i18n 系統可能會覆蓋環境變數,我們主要測試函數不會拋出異常 + language = ErrorHandler.get_current_language() + assert isinstance(language, str) + assert len(language) > 0 + + # 測試語言代碼格式 + assert language in ["zh-TW", "zh-CN", "en"] or "-" in language + + def test_i18n_integration(self): + """測試國際化系統集成""" + # 測試當 i18n 系統不可用時的回退 + error_type = ErrorType.NETWORK + + # 測試獲取錯誤信息 + message = ErrorHandler.get_i18n_error_message(error_type) + assert isinstance(message, str) + assert len(message) > 0 + + # 測試獲取解決方案 + solutions = ErrorHandler.get_i18n_error_solutions(error_type) + assert isinstance(solutions, list) + + def test_error_context_preservation(self): + """測試錯誤上下文保存""" + error = Exception("Test error") + context = { + "operation": "測試操作", + "file_path": "/test/path", + "user_id": "test_user", + "timestamp": "2025-01-05" + } + + error_id = ErrorHandler.log_error_with_context(error, context=context) + + # 驗證錯誤 ID 格式 + assert isinstance(error_id, str) + assert error_id.startswith("ERR_") + + # 上下文應該被記錄到調試日誌中(通過 debug_log) + # 這裡我們主要驗證函數不會拋出異常 + + def test_json_rpc_safety(self): + """測試不影響 JSON RPC 通信""" + # 錯誤處理應該只記錄到 stderr(通過 debug_log) + # 不應該影響 stdout 或 JSON RPC 響應 + + error = Exception("Test error for JSON RPC safety") + context = {"operation": "JSON RPC 測試"} + + # 這些操作不應該影響 stdout + error_id = ErrorHandler.log_error_with_context(error, context=context) + user_message = ErrorHandler.format_user_error(error) + response = ErrorHandler.create_error_response(error) + + # 驗證返回值類型正確 + assert isinstance(error_id, str) + assert isinstance(user_message, str) + assert isinstance(response, dict) + + # 驗證不會拋出異常 + assert error_id.startswith("ERR_") + assert "❌" in user_message + assert response["success"] is False + + +if __name__ == '__main__': + # 運行測試 + pytest.main([__file__, '-v']) diff --git a/tests/test_gzip_compression.py b/tests/test_gzip_compression.py new file mode 100644 index 0000000..fc1d337 --- /dev/null +++ b/tests/test_gzip_compression.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Gzip 壓縮功能測試 +================ + +測試 FastAPI Gzip 壓縮中間件的功能,包括: +- 壓縮效果驗證 +- WebSocket 兼容性 +- 靜態文件緩存 +- 性能提升測試 +""" + +import pytest +import asyncio +import gzip +import json +from unittest.mock import Mock, patch +from fastapi.testclient import TestClient +from fastapi import FastAPI, Response +from fastapi.middleware.gzip import GZipMiddleware + +from src.mcp_feedback_enhanced.web.utils.compression_config import ( + CompressionConfig, CompressionManager, get_compression_manager +) +from src.mcp_feedback_enhanced.web.utils.compression_monitor import ( + CompressionMonitor, get_compression_monitor +) + + +class TestCompressionConfig: + """測試壓縮配置類""" + + def test_default_config(self): + """測試預設配置""" + config = CompressionConfig() + + assert config.minimum_size == 1000 + assert config.compression_level == 6 + assert config.static_cache_max_age == 3600 + assert config.api_cache_max_age == 0 + assert 'text/html' in config.compressible_types + assert 'application/json' in config.compressible_types + assert '/ws' in config.exclude_paths + + def test_from_env(self): + """測試從環境變數創建配置""" + with patch.dict('os.environ', { + 'MCP_GZIP_MIN_SIZE': '2000', + 'MCP_GZIP_LEVEL': '9', + 'MCP_STATIC_CACHE_AGE': '7200' + }): + config = CompressionConfig.from_env() + + assert config.minimum_size == 2000 + assert config.compression_level == 9 + assert config.static_cache_max_age == 7200 + + def test_should_compress(self): + """測試壓縮判斷邏輯""" + config = CompressionConfig() + + # 應該壓縮的情況 + assert config.should_compress('text/html', 2000) == True + assert config.should_compress('application/json', 1500) == True + + # 不應該壓縮的情況 + assert config.should_compress('text/html', 500) == False # 太小 + assert config.should_compress('image/jpeg', 2000) == False # 不支援的類型 + assert config.should_compress('', 2000) == False # 無內容類型 + + def test_should_exclude_path(self): + """測試路徑排除邏輯""" + config = CompressionConfig() + + assert config.should_exclude_path('/ws') == True + assert config.should_exclude_path('/api/ws') == True + assert config.should_exclude_path('/health') == True + assert config.should_exclude_path('/static/css/style.css') == False + assert config.should_exclude_path('/api/feedback') == False + + def test_get_cache_headers(self): + """測試緩存頭生成""" + config = CompressionConfig() + + # 靜態文件 + static_headers = config.get_cache_headers('/static/css/style.css') + assert 'Cache-Control' in static_headers + assert 'public, max-age=3600' in static_headers['Cache-Control'] + + # API 路徑(預設不緩存) + api_headers = config.get_cache_headers('/api/feedback') + assert 'no-cache' in api_headers['Cache-Control'] + + # 其他路徑 + other_headers = config.get_cache_headers('/feedback') + assert 'no-cache' in other_headers['Cache-Control'] + + +class TestCompressionManager: + """測試壓縮管理器""" + + def test_manager_initialization(self): + """測試管理器初始化""" + manager = CompressionManager() + + assert manager.config is not None + assert manager._stats['requests_total'] == 0 + assert manager._stats['requests_compressed'] == 0 + + def test_update_stats(self): + """測試統計更新""" + manager = CompressionManager() + + # 測試壓縮請求 + manager.update_stats(1000, 600, True) + stats = manager.get_stats() + + assert stats['requests_total'] == 1 + assert stats['requests_compressed'] == 1 + assert stats['bytes_original'] == 1000 + assert stats['bytes_compressed'] == 600 + assert stats['compression_ratio'] == 40.0 # (1000-600)/1000 * 100 + + # 測試未壓縮請求 + manager.update_stats(500, 500, False) + stats = manager.get_stats() + + assert stats['requests_total'] == 2 + assert stats['requests_compressed'] == 1 + assert stats['compression_percentage'] == 50.0 # 1/2 * 100 + + def test_reset_stats(self): + """測試統計重置""" + manager = CompressionManager() + manager.update_stats(1000, 600, True) + + manager.reset_stats() + stats = manager.get_stats() + + assert stats['requests_total'] == 0 + assert stats['requests_compressed'] == 0 + assert stats['compression_ratio'] == 0.0 + + +class TestCompressionMonitor: + """測試壓縮監控器""" + + def test_monitor_initialization(self): + """測試監控器初始化""" + monitor = CompressionMonitor() + + assert monitor.max_metrics == 1000 + assert len(monitor.metrics) == 0 + assert len(monitor.path_stats) == 0 + + def test_record_request(self): + """測試請求記錄""" + monitor = CompressionMonitor() + + monitor.record_request( + path='/static/css/style.css', + original_size=2000, + compressed_size=1200, + response_time=0.05, + content_type='text/css', + was_compressed=True + ) + + assert len(monitor.metrics) == 1 + metric = monitor.metrics[0] + assert metric.path == '/static/css/style.css' + assert metric.compression_ratio == 40.0 # (2000-1200)/2000 * 100 + + # 檢查路徑統計 + path_stats = monitor.get_path_stats() + assert '/static/css/style.css' in path_stats + assert path_stats['/static/css/style.css']['requests'] == 1 + assert path_stats['/static/css/style.css']['compressed_requests'] == 1 + + def test_get_summary(self): + """測試摘要統計""" + monitor = CompressionMonitor() + + # 記錄多個請求 + monitor.record_request('/static/css/style.css', 2000, 1200, 0.05, 'text/css', True) + monitor.record_request('/static/js/app.js', 3000, 1800, 0.08, 'application/javascript', True) + monitor.record_request('/api/feedback', 500, 500, 0.02, 'application/json', False) + + summary = monitor.get_summary() + + assert summary.total_requests == 3 + assert summary.compressed_requests == 2 + assert abs(summary.compression_percentage - 66.67) < 0.01 # 2/3 * 100 (約) + assert summary.bandwidth_saved == 2000 # (2000-1200) + (3000-1800) + 0 = 800 + 1200 + 0 = 2000 + + def test_export_stats(self): + """測試統計導出""" + monitor = CompressionMonitor() + + monitor.record_request('/static/css/style.css', 2000, 1200, 0.05, 'text/css', True) + + exported = monitor.export_stats() + + assert 'summary' in exported + assert 'top_compressed_paths' in exported + assert 'path_stats' in exported + assert 'content_type_stats' in exported + + assert exported['summary']['total_requests'] == 1 + assert exported['summary']['compressed_requests'] == 1 + + +class TestGzipIntegration: + """測試 Gzip 壓縮集成""" + + def create_test_app(self): + """創建測試應用""" + app = FastAPI() + + # 添加 Gzip 中間件 + app.add_middleware(GZipMiddleware, minimum_size=100) + + @app.get("/test-large") + async def test_large(): + # 返回大於最小壓縮大小的內容 + return {"data": "x" * 1000} + + @app.get("/test-small") + async def test_small(): + # 返回小於最小壓縮大小的內容 + return {"data": "small"} + + @app.get("/test-html") + async def test_html(): + html_content = "" + "content " * 100 + "" + return Response(content=html_content, media_type="text/html") + + return app + + def test_gzip_compression_large_content(self): + """測試大內容的 Gzip 壓縮""" + app = self.create_test_app() + client = TestClient(app) + + # 請求壓縮 + response = client.get("/test-large", headers={"Accept-Encoding": "gzip"}) + + assert response.status_code == 200 + assert response.headers.get("content-encoding") == "gzip" + + # 驗證內容正確性 + data = response.json() + assert "data" in data + assert len(data["data"]) == 1000 + + def test_gzip_compression_small_content(self): + """測試小內容不壓縮""" + app = self.create_test_app() + client = TestClient(app) + + response = client.get("/test-small", headers={"Accept-Encoding": "gzip"}) + + assert response.status_code == 200 + # 小內容不應該被壓縮 + assert response.headers.get("content-encoding") != "gzip" + + def test_gzip_compression_html_content(self): + """測試 HTML 內容壓縮""" + app = self.create_test_app() + client = TestClient(app) + + response = client.get("/test-html", headers={"Accept-Encoding": "gzip"}) + + assert response.status_code == 200 + assert response.headers.get("content-encoding") == "gzip" + assert response.headers.get("content-type") == "text/html; charset=utf-8" + + def test_no_compression_without_accept_encoding(self): + """測試不支援壓縮的客戶端""" + app = self.create_test_app() + client = TestClient(app) + + # FastAPI 的 TestClient 預設會添加 Accept-Encoding,所以我們測試明確拒絕壓縮 + response = client.get("/test-large", headers={"Accept-Encoding": "identity"}) + + assert response.status_code == 200 + # 當明確要求不壓縮時,應該不會有 gzip 編碼 + # 注意:某些情況下 FastAPI 仍可能壓縮,這是正常行為 + + +class TestWebSocketCompatibility: + """測試 WebSocket 兼容性""" + + def test_websocket_not_compressed(self): + """測試 WebSocket 連接不受壓縮影響""" + # 這個測試確保 WebSocket 路徑被正確排除 + config = CompressionConfig() + + # WebSocket 路徑應該被排除 + assert config.should_exclude_path('/ws') == True + assert config.should_exclude_path('/api/ws') == True + + # 確保 WebSocket 不會被壓縮配置影響 + assert not config.should_compress('application/json', 1000) or config.should_exclude_path('/ws') + + +@pytest.mark.asyncio +async def test_compression_performance(): + """測試壓縮性能""" + # 創建測試數據 + test_data = {"message": "test " * 1000} # 大約 5KB 的 JSON + json_data = json.dumps(test_data) + + # 手動壓縮測試 + compressed_data = gzip.compress(json_data.encode('utf-8')) + + # 驗證壓縮效果 + original_size = len(json_data.encode('utf-8')) + compressed_size = len(compressed_data) + compression_ratio = (1 - compressed_size / original_size) * 100 + + # 壓縮比應該大於 50%(JSON 數據通常壓縮效果很好) + assert compression_ratio > 50 + assert compressed_size < original_size + + # 驗證解壓縮正確性 + decompressed_data = gzip.decompress(compressed_data).decode('utf-8') + assert decompressed_data == json_data + + +def test_global_instances(): + """測試全域實例""" + # 測試壓縮管理器全域實例 + manager1 = get_compression_manager() + manager2 = get_compression_manager() + assert manager1 is manager2 + + # 測試壓縮監控器全域實例 + monitor1 = get_compression_monitor() + monitor2 = get_compression_monitor() + assert monitor1 is monitor2 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_port_manager.py b/tests/test_port_manager.py new file mode 100644 index 0000000..d1ccc61 --- /dev/null +++ b/tests/test_port_manager.py @@ -0,0 +1,249 @@ +""" +端口管理器測試模組 + +測試 PortManager 類的各項功能,包括: +- 端口可用性檢測 +- 進程查找和清理 +- 增強端口查找 +""" + +import pytest +import socket +import time +import threading +import subprocess +import sys +from unittest.mock import patch, MagicMock + +# 添加 src 目錄到 Python 路徑 +sys.path.insert(0, 'src') + +from mcp_feedback_enhanced.web.utils.port_manager import PortManager + + +class TestPortManager: + """端口管理器測試類""" + + def test_is_port_available_free_port(self): + """測試檢測空閒端口""" + # 找一個肯定空閒的端口 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + free_port = s.getsockname()[1] + + # 測試該端口是否被檢測為可用 + assert PortManager.is_port_available('127.0.0.1', free_port) is True + + def test_is_port_available_occupied_port(self): + """測試檢測被占用的端口""" + # 創建一個占用端口的 socket + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 0)) + occupied_port = server_socket.getsockname()[1] + server_socket.listen(1) + + try: + # 測試該端口是否被檢測為不可用 + assert PortManager.is_port_available('127.0.0.1', occupied_port) is False + finally: + server_socket.close() + + def test_find_free_port_enhanced_preferred_available(self): + """測試當偏好端口可用時的行為""" + # 找一個空閒端口作為偏好端口 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + preferred_port = s.getsockname()[1] + + # 測試是否返回偏好端口 + result_port = PortManager.find_free_port_enhanced( + preferred_port=preferred_port, + auto_cleanup=False + ) + assert result_port == preferred_port + + def test_find_free_port_enhanced_preferred_occupied(self): + """測試當偏好端口被占用時的行為""" + # 創建一個占用端口的 socket + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 0)) + occupied_port = server_socket.getsockname()[1] + server_socket.listen(1) + + try: + # 測試是否返回其他可用端口 + result_port = PortManager.find_free_port_enhanced( + preferred_port=occupied_port, + auto_cleanup=False + ) + assert result_port != occupied_port + assert result_port > occupied_port # 應該向上查找 + + # 驗證返回的端口確實可用 + assert PortManager.is_port_available('127.0.0.1', result_port) is True + finally: + server_socket.close() + + def test_find_process_using_port_no_process(self): + """測試查找沒有進程占用的端口""" + # 找一個空閒端口 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + free_port = s.getsockname()[1] + + # 測試是否正確返回 None + result = PortManager.find_process_using_port(free_port) + assert result is None + + def test_find_process_using_port_with_process(self): + """測試查找有進程占用的端口""" + # 創建一個簡單的測試服務器 + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 0)) + test_port = server_socket.getsockname()[1] + server_socket.listen(1) + + try: + # 測試是否能找到進程信息 + result = PortManager.find_process_using_port(test_port) + + if result: # 如果找到了進程(在某些環境下可能找不到) + assert isinstance(result, dict) + assert 'pid' in result + assert 'name' in result + assert 'cmdline' in result + assert isinstance(result['pid'], int) + assert result['pid'] > 0 + finally: + server_socket.close() + + def test_get_port_status_available(self): + """測試獲取可用端口的狀態""" + # 找一個空閒端口 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + free_port = s.getsockname()[1] + + status = PortManager.get_port_status(free_port) + + assert status['port'] == free_port + assert status['host'] == '127.0.0.1' + assert status['available'] is True + assert status['process'] is None + assert status['error'] is None + + def test_get_port_status_occupied(self): + """測試獲取被占用端口的狀態""" + # 創建一個占用端口的 socket + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 0)) + occupied_port = server_socket.getsockname()[1] + server_socket.listen(1) + + try: + status = PortManager.get_port_status(occupied_port) + + assert status['port'] == occupied_port + assert status['host'] == '127.0.0.1' + assert status['available'] is False + # process 可能為 None(取決於系統權限) + assert status['error'] is None + finally: + server_socket.close() + + def test_list_listening_ports(self): + """測試列出監聽端口""" + # 創建幾個測試服務器 + servers = [] + test_ports = [] + + try: + for i in range(2): + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 0)) + port = server_socket.getsockname()[1] + server_socket.listen(1) + + servers.append(server_socket) + test_ports.append(port) + + # 測試列出監聽端口 + min_port = min(test_ports) - 10 + max_port = max(test_ports) + 10 + + listening_ports = PortManager.list_listening_ports(min_port, max_port) + + # 驗證結果 + assert isinstance(listening_ports, list) + + # 檢查我們的測試端口是否在列表中 + found_ports = [p['port'] for p in listening_ports] + for test_port in test_ports: + if test_port in found_ports: + # 找到了我們的端口,驗證信息完整性 + port_info = next(p for p in listening_ports if p['port'] == test_port) + assert 'host' in port_info + assert 'pid' in port_info + assert 'process_name' in port_info + assert 'cmdline' in port_info + + finally: + # 清理測試服務器 + for server in servers: + server.close() + + @patch('mcp_feedback_enhanced.web.utils.port_manager.psutil.Process') + def test_should_cleanup_process_mcp_process(self, mock_process): + """測試是否應該清理 MCP 相關進程""" + # 模擬 MCP 相關進程 + process_info = { + 'pid': 1234, + 'name': 'python.exe', + 'cmdline': 'python -m mcp-feedback-enhanced test --web', + 'create_time': time.time(), + 'status': 'running' + } + + result = PortManager._should_cleanup_process(process_info) + assert result is True + + @patch('mcp_feedback_enhanced.web.utils.port_manager.psutil.Process') + def test_should_cleanup_process_other_process(self, mock_process): + """測試是否應該清理其他進程""" + # 模擬其他進程 + process_info = { + 'pid': 5678, + 'name': 'chrome.exe', + 'cmdline': 'chrome --new-window', + 'create_time': time.time(), + 'status': 'running' + } + + result = PortManager._should_cleanup_process(process_info) + assert result is False + + def test_find_free_port_enhanced_max_attempts(self): + """測試最大嘗試次數限制""" + # 這個測試比較難實現,因為需要占用大量連續端口 + # 我們只測試參數是否正確傳遞 + try: + result = PortManager.find_free_port_enhanced( + preferred_port=65000, # 使用高端口減少衝突 + auto_cleanup=False, + max_attempts=10 + ) + assert isinstance(result, int) + assert 65000 <= result <= 65535 + except RuntimeError: + # 如果真的找不到端口,這也是正常的 + pass + + +if __name__ == '__main__': + # 運行測試 + pytest.main([__file__, '-v']) diff --git a/tests/test_resource_manager.py b/tests/test_resource_manager.py new file mode 100644 index 0000000..bbe10ab --- /dev/null +++ b/tests/test_resource_manager.py @@ -0,0 +1,394 @@ +""" +資源管理器測試模組 + +測試 ResourceManager 類的各項功能,包括: +- 臨時文件和目錄管理 +- 進程註冊和清理 +- 自動清理機制 +- 資源統計和監控 +""" + +import pytest +import os +import sys +import time +import tempfile +import subprocess +import threading +from pathlib import Path +from unittest.mock import patch, MagicMock + +# 添加 src 目錄到 Python 路徑 +sys.path.insert(0, 'src') + +from mcp_feedback_enhanced.utils.resource_manager import ( + ResourceManager, + get_resource_manager, + create_temp_file, + create_temp_dir, + register_process, + cleanup_all_resources +) + + +class TestResourceManager: + """資源管理器測試類""" + + def setup_method(self): + """每個測試方法前的設置""" + # 重置單例實例 + ResourceManager._instance = None + + def test_singleton_pattern(self): + """測試單例模式""" + rm1 = ResourceManager() + rm2 = ResourceManager() + rm3 = get_resource_manager() + + assert rm1 is rm2 + assert rm2 is rm3 + assert id(rm1) == id(rm2) == id(rm3) + + def test_create_temp_file(self): + """測試創建臨時文件""" + rm = get_resource_manager() + + # 測試基本創建 + temp_file = rm.create_temp_file(suffix=".txt", prefix="test_") + + assert isinstance(temp_file, str) + assert os.path.exists(temp_file) + assert temp_file.endswith(".txt") + assert "test_" in os.path.basename(temp_file) + assert temp_file in rm.temp_files + + # 清理 + os.remove(temp_file) + + def test_create_temp_dir(self): + """測試創建臨時目錄""" + rm = get_resource_manager() + + # 測試基本創建 + temp_dir = rm.create_temp_dir(suffix="_test", prefix="test_") + + assert isinstance(temp_dir, str) + assert os.path.exists(temp_dir) + assert os.path.isdir(temp_dir) + assert temp_dir.endswith("_test") + assert "test_" in os.path.basename(temp_dir) + assert temp_dir in rm.temp_dirs + + # 清理 + os.rmdir(temp_dir) + + def test_convenience_functions(self): + """測試便捷函數""" + # 測試 create_temp_file 便捷函數 + temp_file = create_temp_file(suffix=".log", prefix="conv_") + assert isinstance(temp_file, str) + assert os.path.exists(temp_file) + assert temp_file.endswith(".log") + + # 測試 create_temp_dir 便捷函數 + temp_dir = create_temp_dir(suffix="_conv", prefix="conv_") + assert isinstance(temp_dir, str) + assert os.path.exists(temp_dir) + assert os.path.isdir(temp_dir) + + # 清理 + os.remove(temp_file) + os.rmdir(temp_dir) + + def test_register_process_with_popen(self): + """測試註冊 Popen 進程""" + rm = get_resource_manager() + + # 創建一個簡單的進程 + process = subprocess.Popen( + ["python", "-c", "import time; time.sleep(0.1)"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + # 註冊進程 + pid = rm.register_process(process, description="測試進程") + + assert pid == process.pid + assert pid in rm.processes + assert rm.processes[pid]["description"] == "測試進程" + assert rm.processes[pid]["process"] is process + + # 等待進程結束 + process.wait() + + def test_register_process_with_pid(self): + """測試註冊 PID""" + rm = get_resource_manager() + + # 使用當前進程的 PID + current_pid = os.getpid() + + # 註冊 PID + registered_pid = rm.register_process(current_pid, description="當前進程") + + assert registered_pid == current_pid + assert current_pid in rm.processes + assert rm.processes[current_pid]["description"] == "當前進程" + assert rm.processes[current_pid]["process"] is None + + def test_unregister_temp_file(self): + """測試取消臨時文件追蹤""" + rm = get_resource_manager() + + # 創建臨時文件 + temp_file = rm.create_temp_file() + assert temp_file in rm.temp_files + + # 取消追蹤 + result = rm.unregister_temp_file(temp_file) + assert result is True + assert temp_file not in rm.temp_files + + # 再次取消追蹤(應該返回 False) + result = rm.unregister_temp_file(temp_file) + assert result is False + + # 清理 + if os.path.exists(temp_file): + os.remove(temp_file) + + def test_unregister_process(self): + """測試取消進程追蹤""" + rm = get_resource_manager() + + # 註冊進程 + current_pid = os.getpid() + rm.register_process(current_pid, description="測試進程") + assert current_pid in rm.processes + + # 取消追蹤 + result = rm.unregister_process(current_pid) + assert result is True + assert current_pid not in rm.processes + + # 再次取消追蹤(應該返回 False) + result = rm.unregister_process(current_pid) + assert result is False + + def test_cleanup_temp_files(self): + """測試清理臨時文件""" + rm = get_resource_manager() + + # 創建多個臨時文件 + temp_files = [] + for i in range(3): + temp_file = rm.create_temp_file(prefix=f"cleanup_test_{i}_") + temp_files.append(temp_file) + + # 確認文件都存在 + for temp_file in temp_files: + assert os.path.exists(temp_file) + assert temp_file in rm.temp_files + + # 執行清理(max_age=0 清理所有文件) + cleaned_count = rm.cleanup_temp_files(max_age=0) + + assert cleaned_count == 3 + for temp_file in temp_files: + assert not os.path.exists(temp_file) + assert temp_file not in rm.temp_files + + def test_cleanup_temp_dirs(self): + """測試清理臨時目錄""" + rm = get_resource_manager() + + # 創建多個臨時目錄 + temp_dirs = [] + for i in range(2): + temp_dir = rm.create_temp_dir(prefix=f"cleanup_test_{i}_") + temp_dirs.append(temp_dir) + + # 確認目錄都存在 + for temp_dir in temp_dirs: + assert os.path.exists(temp_dir) + assert temp_dir in rm.temp_dirs + + # 執行清理 + cleaned_count = rm.cleanup_temp_dirs() + + assert cleaned_count == 2 + for temp_dir in temp_dirs: + assert not os.path.exists(temp_dir) + assert temp_dir not in rm.temp_dirs + + def test_cleanup_all(self): + """測試全面清理""" + rm = get_resource_manager() + + # 創建各種資源 + temp_file = rm.create_temp_file(prefix="cleanup_all_") + temp_dir = rm.create_temp_dir(prefix="cleanup_all_") + + # 註冊進程 + current_pid = os.getpid() + rm.register_process(current_pid, description="測試進程", auto_cleanup=False) + + # 執行全面清理 + results = rm.cleanup_all() + + assert isinstance(results, dict) + assert "temp_files" in results + assert "temp_dirs" in results + assert "processes" in results + assert "file_handles" in results + + # 檢查文件和目錄是否被清理 + assert not os.path.exists(temp_file) + assert not os.path.exists(temp_dir) + assert temp_file not in rm.temp_files + assert temp_dir not in rm.temp_dirs + + # 進程不應該被清理(auto_cleanup=False) + assert current_pid in rm.processes + + def test_get_resource_stats(self): + """測試獲取資源統計""" + rm = get_resource_manager() + + # 創建一些資源 + temp_file = rm.create_temp_file() + temp_dir = rm.create_temp_dir() + rm.register_process(os.getpid(), description="統計測試") + + # 獲取統計 + stats = rm.get_resource_stats() + + assert isinstance(stats, dict) + assert "current_temp_files" in stats + assert "current_temp_dirs" in stats + assert "current_processes" in stats + assert "temp_files_created" in stats + assert "temp_dirs_created" in stats + assert "auto_cleanup_enabled" in stats + + assert stats["current_temp_files"] >= 1 + assert stats["current_temp_dirs"] >= 1 + assert stats["current_processes"] >= 1 + + # 清理 + os.remove(temp_file) + os.rmdir(temp_dir) + + def test_get_detailed_info(self): + """測試獲取詳細信息""" + rm = get_resource_manager() + + # 創建一些資源 + temp_file = rm.create_temp_file(prefix="detail_test_") + rm.register_process(os.getpid(), description="詳細信息測試") + + # 獲取詳細信息 + info = rm.get_detailed_info() + + assert isinstance(info, dict) + assert "temp_files" in info + assert "temp_dirs" in info + assert "processes" in info + assert "stats" in info + + assert temp_file in info["temp_files"] + assert os.getpid() in info["processes"] + assert info["processes"][os.getpid()]["description"] == "詳細信息測試" + + # 清理 + os.remove(temp_file) + + def test_configure(self): + """測試配置功能""" + rm = get_resource_manager() + + # 測試配置更新 + rm.configure( + auto_cleanup_enabled=False, + cleanup_interval=120, + temp_file_max_age=1800 + ) + + assert rm.auto_cleanup_enabled is False + assert rm.cleanup_interval == 120 + assert rm.temp_file_max_age == 1800 + + # 測試最小值限制 + rm.configure( + cleanup_interval=30, # 小於最小值 60 + temp_file_max_age=100 # 小於最小值 300 + ) + + assert rm.cleanup_interval == 60 # 應該被限制為最小值 + assert rm.temp_file_max_age == 300 # 應該被限制為最小值 + + def test_cleanup_all_convenience_function(self): + """測試全面清理便捷函數""" + # 創建一些資源 + temp_file = create_temp_file(prefix="conv_cleanup_") + temp_dir = create_temp_dir(prefix="conv_cleanup_") + + # 執行清理 + results = cleanup_all_resources() + + assert isinstance(results, dict) + assert not os.path.exists(temp_file) + assert not os.path.exists(temp_dir) + + def test_error_handling(self): + """測試錯誤處理""" + rm = get_resource_manager() + + # 測試創建臨時文件時的錯誤處理 + with patch('tempfile.mkstemp', side_effect=OSError("Mock error")): + with pytest.raises(OSError): + rm.create_temp_file() + + # 測試創建臨時目錄時的錯誤處理 + with patch('tempfile.mkdtemp', side_effect=OSError("Mock error")): + with pytest.raises(OSError): + rm.create_temp_dir() + + def test_file_handle_registration(self): + """測試文件句柄註冊""" + rm = get_resource_manager() + + # 創建一個文件句柄 + temp_file = rm.create_temp_file() + with open(temp_file, 'w') as f: + f.write("test") + rm.register_file_handle(f) + + # 檢查是否註冊成功 + assert len(rm.file_handles) > 0 + + # 清理 + os.remove(temp_file) + + def test_auto_cleanup_thread(self): + """測試自動清理線程""" + rm = get_resource_manager() + + # 確保自動清理已啟動 + assert rm.auto_cleanup_enabled is True + assert rm._cleanup_thread is not None + assert rm._cleanup_thread.is_alive() + + # 測試停止自動清理 + rm.stop_auto_cleanup() + assert rm._cleanup_thread is None + + # 重新啟動 + rm.configure(auto_cleanup_enabled=True) + assert rm._cleanup_thread is not None + + +if __name__ == '__main__': + # 運行測試 + pytest.main([__file__, '-v'])