From b9d781f147234fb39ab9d98011689b2418a6e1f3 Mon Sep 17 00:00:00 2001 From: Minidoracat Date: Sat, 7 Jun 2025 02:54:46 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E6=96=B0=E5=A2=9E=E6=9C=83?= =?UTF-8?q?=E8=A9=B1=E6=B8=85=E7=90=86=E7=AE=A1=E7=90=86=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E6=95=B4=E5=90=88=E8=87=AA=E5=8B=95=E6=B8=85=E7=90=86=E7=AD=96?= =?UTF-8?q?=E7=95=A5=E3=80=81=E7=B5=B1=E8=A8=88=E8=88=87=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E7=9B=A3=E6=8E=A7=EF=BC=8C=E6=8F=90=E5=8D=87=E7=B3=BB=E7=B5=B1?= =?UTF-8?q?=E8=B3=87=E6=BA=90=E7=AE=A1=E7=90=86=E6=95=88=E7=8E=87=E3=80=82?= =?UTF-8?q?=E6=93=B4=E5=B1=95=20WebFeedbackSession=20=E5=8F=8A=20SessionCl?= =?UTF-8?q?eanupManager=20=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=81=8E=E6=9C=9F=E8=88=87=E5=85=A7=E5=AD=98=E5=A3=93=E5=8A=9B?= =?UTF-8?q?=E6=B8=85=E7=90=86=EF=BC=8C=E4=B8=A6=E6=96=B0=E5=A2=9E=E6=B8=AC?= =?UTF-8?q?=E8=A9=A6=E6=A8=A1=E7=B5=84=E4=BB=A5=E7=A2=BA=E4=BF=9D=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E6=AD=A3=E7=A2=BA=E6=80=A7=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mcp_feedback_enhanced/web/main.py | 219 +++++++- .../web/models/__init__.py | 6 +- .../web/models/feedback_session.py | 451 ++++++++++++++-- .../web/utils/session_cleanup_manager.py | 504 ++++++++++++++++++ tests/test_session_cleanup.py | 366 +++++++++++++ 5 files changed, 1481 insertions(+), 65 deletions(-) create mode 100644 src/mcp_feedback_enhanced/web/utils/session_cleanup_manager.py create mode 100644 tests/test_session_cleanup.py diff --git a/src/mcp_feedback_enhanced/web/main.py b/src/mcp_feedback_enhanced/web/main.py index 05b50fc..f4aa907 100644 --- a/src/mcp_feedback_enhanced/web/main.py +++ b/src/mcp_feedback_enhanced/web/main.py @@ -17,7 +17,8 @@ import threading import time import webbrowser from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, List +from datetime import datetime import uuid from fastapi import FastAPI, Request, Response @@ -26,7 +27,7 @@ from fastapi.templating import Jinja2Templates from fastapi.middleware.gzip import GZipMiddleware import uvicorn -from .models import WebFeedbackSession, FeedbackResult +from .models import WebFeedbackSession, FeedbackResult, CleanupReason, SessionStatus from .routes import setup_routes from .utils import find_free_port, get_browser_opener from .utils.port_manager import PortManager @@ -85,6 +86,17 @@ class WebUIManager: # 會話更新通知標記 self._pending_session_update = False + # 會話清理統計 + self.cleanup_stats = { + "total_cleanups": 0, + "expired_cleanups": 0, + "memory_pressure_cleanups": 0, + "manual_cleanups": 0, + "last_cleanup_time": None, + "total_cleanup_duration": 0.0, + "sessions_cleaned": 0 + } + self.server_thread = None self.server_process = None self.i18n = get_i18n_manager() @@ -148,16 +160,46 @@ class WebUIManager: # 添加 Web 應用特定的警告回調 def web_memory_alert(alert): debug_log(f"Web UI 內存警告 [{alert.level}]: {alert.message}") - # 可以在這裡添加更多 Web 特定的處理邏輯 - # 例如:通過 WebSocket 通知前端、記錄到特定日誌等 + + # 根據警告級別觸發不同的清理策略 + if alert.level == "critical": + # 危險級別:清理過期會話 + cleaned = self.cleanup_expired_sessions() + debug_log(f"內存危險警告觸發,清理了 {cleaned} 個過期會話") + elif alert.level == "emergency": + # 緊急級別:強制清理會話 + cleaned = self.cleanup_sessions_by_memory_pressure(force=True) + debug_log(f"內存緊急警告觸發,強制清理了 {cleaned} 個會話") self.memory_monitor.add_alert_callback(web_memory_alert) + # 添加會話清理回調到內存監控 + def session_cleanup_callback(force: bool = False): + """內存監控觸發的會話清理回調""" + try: + if force: + # 強制清理:包括內存壓力清理 + cleaned = self.cleanup_sessions_by_memory_pressure(force=True) + debug_log(f"內存監控強制清理了 {cleaned} 個會話") + else: + # 常規清理:只清理過期會話 + cleaned = self.cleanup_expired_sessions() + debug_log(f"內存監控清理了 {cleaned} 個過期會話") + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "內存監控會話清理", "force": force}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"內存監控會話清理失敗 [錯誤ID: {error_id}]: {e}") + + self.memory_monitor.add_cleanup_callback(session_cleanup_callback) + # 確保內存監控已啟動(ResourceManager 可能已經啟動了) if not self.memory_monitor.is_monitoring: self.memory_monitor.start_monitoring() - debug_log("Web UI 內存監控設置完成") + debug_log("Web UI 內存監控設置完成,已集成會話清理回調") except Exception as e: error_id = ErrorHandler.log_error_with_context( @@ -563,13 +605,176 @@ class WebUIManager: """獲取伺服器 URL""" return f"http://{self.host}:{self.port}" + def cleanup_expired_sessions(self) -> int: + """清理過期會話""" + cleanup_start_time = time.time() + expired_sessions = [] + + # 掃描過期會話 + for session_id, session in self.sessions.items(): + if session.is_expired(): + expired_sessions.append(session_id) + + # 批量清理過期會話 + cleaned_count = 0 + for session_id in expired_sessions: + try: + session = self.sessions.get(session_id) + if session: + # 使用增強清理方法 + session._cleanup_sync_enhanced(CleanupReason.EXPIRED) + del self.sessions[session_id] + cleaned_count += 1 + + # 如果清理的是當前活躍會話,清空當前會話 + if self.current_session and self.current_session.session_id == session_id: + self.current_session = None + debug_log("清空過期的當前活躍會話") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"session_id": session_id, "operation": "清理過期會話"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"清理過期會話 {session_id} 失敗 [錯誤ID: {error_id}]: {e}") + + # 更新統計 + cleanup_duration = time.time() - cleanup_start_time + self.cleanup_stats.update({ + "total_cleanups": self.cleanup_stats["total_cleanups"] + 1, + "expired_cleanups": self.cleanup_stats["expired_cleanups"] + 1, + "last_cleanup_time": datetime.now().isoformat(), + "total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration, + "sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + cleaned_count + }) + + if cleaned_count > 0: + debug_log(f"清理了 {cleaned_count} 個過期會話,耗時: {cleanup_duration:.2f}秒") + + return cleaned_count + + def cleanup_sessions_by_memory_pressure(self, force: bool = False) -> int: + """根據內存壓力清理會話""" + cleanup_start_time = time.time() + sessions_to_clean = [] + + # 根據優先級選擇要清理的會話 + # 優先級:已完成 > 已提交反饋 > 錯誤狀態 > 空閒時間最長 + for session_id, session in self.sessions.items(): + # 跳過當前活躍會話(除非強制清理) + if not force and self.current_session and session.session_id == self.current_session.session_id: + continue + + # 優先清理已完成或錯誤狀態的會話 + if session.status in [SessionStatus.COMPLETED, SessionStatus.ERROR, SessionStatus.TIMEOUT]: + sessions_to_clean.append((session_id, session, 1)) # 高優先級 + elif session.status == SessionStatus.FEEDBACK_SUBMITTED: + # 已提交反饋但空閒時間較長的會話 + if session.get_idle_time() > 300: # 5分鐘空閒 + sessions_to_clean.append((session_id, session, 2)) # 中優先級 + elif session.get_idle_time() > 600: # 10分鐘空閒 + sessions_to_clean.append((session_id, session, 3)) # 低優先級 + + # 按優先級排序 + sessions_to_clean.sort(key=lambda x: x[2]) + + # 清理會話(限制數量避免過度清理) + max_cleanup = min(len(sessions_to_clean), 5 if not force else len(sessions_to_clean)) + cleaned_count = 0 + + for i in range(max_cleanup): + session_id, session, priority = sessions_to_clean[i] + try: + # 使用增強清理方法 + session._cleanup_sync_enhanced(CleanupReason.MEMORY_PRESSURE) + del self.sessions[session_id] + cleaned_count += 1 + + # 如果清理的是當前活躍會話,清空當前會話 + if self.current_session and self.current_session.session_id == session_id: + self.current_session = None + debug_log("因內存壓力清空當前活躍會話") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"session_id": session_id, "operation": "內存壓力清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"內存壓力清理會話 {session_id} 失敗 [錯誤ID: {error_id}]: {e}") + + # 更新統計 + cleanup_duration = time.time() - cleanup_start_time + self.cleanup_stats.update({ + "total_cleanups": self.cleanup_stats["total_cleanups"] + 1, + "memory_pressure_cleanups": self.cleanup_stats["memory_pressure_cleanups"] + 1, + "last_cleanup_time": datetime.now().isoformat(), + "total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration, + "sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + cleaned_count + }) + + if cleaned_count > 0: + debug_log(f"因內存壓力清理了 {cleaned_count} 個會話,耗時: {cleanup_duration:.2f}秒") + + return cleaned_count + + def get_session_cleanup_stats(self) -> dict: + """獲取會話清理統計""" + stats = self.cleanup_stats.copy() + stats.update({ + "active_sessions": len(self.sessions), + "current_session_id": self.current_session.session_id if self.current_session else None, + "expired_sessions": sum(1 for s in self.sessions.values() if s.is_expired()), + "idle_sessions": sum(1 for s in self.sessions.values() if s.get_idle_time() > 300), + "memory_usage_mb": 0 # 將在下面計算 + }) + + # 計算內存使用(如果可能) + try: + import psutil + process = psutil.Process() + stats["memory_usage_mb"] = round(process.memory_info().rss / (1024 * 1024), 2) + except: + pass + + return stats + + def _scan_expired_sessions(self) -> List[str]: + """掃描過期會話ID列表""" + expired_sessions = [] + for session_id, session in self.sessions.items(): + if session.is_expired(): + expired_sessions.append(session_id) + return expired_sessions + def stop(self): """停止 Web UI 服務""" # 清理所有會話 + cleanup_start_time = time.time() + session_count = len(self.sessions) + for session in list(self.sessions.values()): - session.cleanup() + try: + session._cleanup_sync_enhanced(CleanupReason.SHUTDOWN) + except Exception as e: + debug_log(f"停止服務時清理會話失敗: {e}") + self.sessions.clear() - + self.current_session = None + + # 更新統計 + cleanup_duration = time.time() - cleanup_start_time + self.cleanup_stats.update({ + "total_cleanups": self.cleanup_stats["total_cleanups"] + 1, + "manual_cleanups": self.cleanup_stats["manual_cleanups"] + 1, + "last_cleanup_time": datetime.now().isoformat(), + "total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration, + "sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + session_count + }) + + debug_log(f"停止服務時清理了 {session_count} 個會話,耗時: {cleanup_duration:.2f}秒") + # 停止伺服器(注意:uvicorn 的 graceful shutdown 需要額外處理) if self.server_thread and self.server_thread.is_alive(): debug_log("正在停止 Web UI 服務") diff --git a/src/mcp_feedback_enhanced/web/models/__init__.py b/src/mcp_feedback_enhanced/web/models/__init__.py index d06dd01..36e7108 100644 --- a/src/mcp_feedback_enhanced/web/models/__init__.py +++ b/src/mcp_feedback_enhanced/web/models/__init__.py @@ -7,10 +7,12 @@ Web UI 資料模型模組 定義 Web UI 相關的資料結構和型別。 """ -from .feedback_session import WebFeedbackSession +from .feedback_session import WebFeedbackSession, SessionStatus, CleanupReason from .feedback_result import FeedbackResult __all__ = [ 'WebFeedbackSession', + 'SessionStatus', + 'CleanupReason', 'FeedbackResult' -] \ No newline at end of file +] \ No newline at end of file diff --git a/src/mcp_feedback_enhanced/web/models/feedback_session.py b/src/mcp_feedback_enhanced/web/models/feedback_session.py index 6089f63..05c8d6f 100644 --- a/src/mcp_feedback_enhanced/web/models/feedback_session.py +++ b/src/mcp_feedback_enhanced/web/models/feedback_session.py @@ -11,14 +11,17 @@ import asyncio import base64 import subprocess import threading +import time +from datetime import datetime, timedelta from enum import Enum from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Callable from fastapi import WebSocket from ...debug import web_debug_log as debug_log from ...utils.resource_manager import get_resource_manager, register_process +from ...utils.error_handler import ErrorHandler, ErrorType class SessionStatus(Enum): @@ -29,6 +32,17 @@ class SessionStatus(Enum): COMPLETED = "completed" # 已完成 TIMEOUT = "timeout" # 超時 ERROR = "error" # 錯誤 + EXPIRED = "expired" # 已過期 + + +class CleanupReason(Enum): + """清理原因枚舉""" + TIMEOUT = "timeout" # 超時清理 + EXPIRED = "expired" # 過期清理 + MEMORY_PRESSURE = "memory_pressure" # 內存壓力清理 + MANUAL = "manual" # 手動清理 + ERROR = "error" # 錯誤清理 + SHUTDOWN = "shutdown" # 系統關閉清理 # 常數定義 MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB 圖片大小限制 @@ -39,7 +53,8 @@ TEMP_DIR = Path.home() / ".cache" / "interactive-feedback-mcp-web" class WebFeedbackSession: """Web 回饋會話管理""" - def __init__(self, session_id: str, project_directory: str, summary: str): + def __init__(self, session_id: str, project_directory: str, summary: str, + auto_cleanup_delay: int = 3600, max_idle_time: int = 1800): self.session_id = session_id self.project_directory = project_directory self.summary = summary @@ -55,21 +70,49 @@ class WebFeedbackSession: # 新增:會話狀態管理 self.status = SessionStatus.WAITING self.status_message = "等待用戶回饋" - self.created_at = asyncio.get_event_loop().time() + # 統一使用 time.time() 以避免時間基準不一致 + self.created_at = time.time() self.last_activity = self.created_at + # 新增:自動清理配置 + self.auto_cleanup_delay = auto_cleanup_delay # 自動清理延遲時間(秒) + self.max_idle_time = max_idle_time # 最大空閒時間(秒) + self.cleanup_timer: Optional[threading.Timer] = None + self.cleanup_callbacks: List[Callable] = [] # 清理回調函數列表 + + # 新增:清理統計 + self.cleanup_stats = { + "cleanup_count": 0, + "last_cleanup_time": None, + "cleanup_reason": None, + "cleanup_duration": 0.0, + "memory_freed": 0, + "resources_cleaned": 0 + } + # 確保臨時目錄存在 TEMP_DIR.mkdir(parents=True, exist_ok=True) # 獲取資源管理器實例 self.resource_manager = get_resource_manager() + # 啟動自動清理定時器 + self._schedule_auto_cleanup() + + debug_log(f"會話 {self.session_id} 初始化完成,自動清理延遲: {auto_cleanup_delay}秒,最大空閒: {max_idle_time}秒") + def update_status(self, status: SessionStatus, message: str = None): """更新會話狀態""" self.status = status if message: self.status_message = message - self.last_activity = asyncio.get_event_loop().time() + # 統一使用 time.time() + self.last_activity = time.time() + + # 如果會話變為活躍狀態,重置清理定時器 + if status in [SessionStatus.ACTIVE, SessionStatus.FEEDBACK_SUBMITTED]: + self._schedule_auto_cleanup() + debug_log(f"會話 {self.session_id} 狀態更新: {status.value} - {self.status_message}") def get_status_info(self) -> dict: @@ -90,6 +133,117 @@ class WebFeedbackSession: """檢查會話是否活躍""" return self.status in [SessionStatus.WAITING, SessionStatus.ACTIVE, SessionStatus.FEEDBACK_SUBMITTED] + def is_expired(self) -> bool: + """檢查會話是否已過期""" + # 統一使用 time.time() + current_time = time.time() + + # 檢查是否超過最大空閒時間 + idle_time = current_time - self.last_activity + if idle_time > self.max_idle_time: + debug_log(f"會話 {self.session_id} 空閒時間過長: {idle_time:.1f}秒 > {self.max_idle_time}秒") + return True + + # 檢查是否處於已過期狀態 + if self.status == SessionStatus.EXPIRED: + return True + + # 檢查是否處於錯誤或超時狀態且超過一定時間 + if self.status in [SessionStatus.ERROR, SessionStatus.TIMEOUT]: + error_time = current_time - self.last_activity + if error_time > 300: # 錯誤狀態超過5分鐘視為過期 + debug_log(f"會話 {self.session_id} 錯誤狀態時間過長: {error_time:.1f}秒") + return True + + return False + + def get_age(self) -> float: + """獲取會話年齡(秒)""" + current_time = time.time() + return current_time - self.created_at + + def get_idle_time(self) -> float: + """獲取會話空閒時間(秒)""" + current_time = time.time() + return current_time - self.last_activity + + def _schedule_auto_cleanup(self): + """安排自動清理定時器""" + if self.cleanup_timer: + self.cleanup_timer.cancel() + + def auto_cleanup(): + """自動清理回調""" + try: + if not self._cleanup_done and self.is_expired(): + debug_log(f"會話 {self.session_id} 觸發自動清理(過期)") + # 使用異步方式執行清理 + import asyncio + try: + loop = asyncio.get_event_loop() + loop.create_task(self._cleanup_resources_enhanced(CleanupReason.EXPIRED)) + except RuntimeError: + # 如果沒有事件循環,使用同步清理 + self._cleanup_sync_enhanced(CleanupReason.EXPIRED) + else: + # 如果還沒過期,重新安排定時器 + self._schedule_auto_cleanup() + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"session_id": self.session_id, "operation": "自動清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"自動清理失敗 [錯誤ID: {error_id}]: {e}") + + self.cleanup_timer = threading.Timer(self.auto_cleanup_delay, auto_cleanup) + self.cleanup_timer.daemon = True + self.cleanup_timer.start() + debug_log(f"會話 {self.session_id} 自動清理定時器已設置,{self.auto_cleanup_delay}秒後觸發") + + def extend_cleanup_timer(self, additional_time: int = None): + """延長清理定時器""" + if additional_time is None: + additional_time = self.auto_cleanup_delay + + if self.cleanup_timer: + self.cleanup_timer.cancel() + + self.cleanup_timer = threading.Timer(additional_time, lambda: None) + self.cleanup_timer.daemon = True + self.cleanup_timer.start() + + debug_log(f"會話 {self.session_id} 清理定時器已延長 {additional_time} 秒") + + def add_cleanup_callback(self, callback: Callable): + """添加清理回調函數""" + if callback not in self.cleanup_callbacks: + self.cleanup_callbacks.append(callback) + debug_log(f"會話 {self.session_id} 添加清理回調函數") + + def remove_cleanup_callback(self, callback: Callable): + """移除清理回調函數""" + if callback in self.cleanup_callbacks: + self.cleanup_callbacks.remove(callback) + debug_log(f"會話 {self.session_id} 移除清理回調函數") + + def get_cleanup_stats(self) -> dict: + """獲取清理統計信息""" + stats = self.cleanup_stats.copy() + stats.update({ + "session_id": self.session_id, + "age": self.get_age(), + "idle_time": self.get_idle_time(), + "is_expired": self.is_expired(), + "is_active": self.is_active(), + "status": self.status.value, + "has_websocket": self.websocket is not None, + "has_process": self.process is not None, + "command_logs_count": len(self.command_logs), + "images_count": len(self.images) + }) + return stats + async def wait_for_feedback(self, timeout: int = 600) -> dict: """ 等待用戶回饋,包含圖片,支援超時自動清理 @@ -321,33 +475,72 @@ class WebFeedbackSession: pass async def _cleanup_resources_on_timeout(self): - """超時時清理所有資源""" + """超時時清理所有資源(保持向後兼容)""" + await self._cleanup_resources_enhanced(CleanupReason.TIMEOUT) + + async def _cleanup_resources_enhanced(self, reason: CleanupReason): + """增強的資源清理方法""" if self._cleanup_done: return # 避免重複清理 - + + cleanup_start_time = time.time() self._cleanup_done = True - debug_log(f"開始清理會話 {self.session_id} 的資源...") - + + debug_log(f"開始清理會話 {self.session_id} 的資源,原因: {reason.value}") + + # 更新清理統計 + self.cleanup_stats["cleanup_count"] += 1 + self.cleanup_stats["cleanup_reason"] = reason.value + self.cleanup_stats["last_cleanup_time"] = datetime.now().isoformat() + + resources_cleaned = 0 + memory_before = 0 + try: - # 1. 關閉 WebSocket 連接 + # 記錄清理前的內存使用(如果可能) + try: + import psutil + process = psutil.Process() + memory_before = process.memory_info().rss + except: + pass + + # 1. 取消自動清理定時器 + if self.cleanup_timer: + self.cleanup_timer.cancel() + self.cleanup_timer = None + resources_cleaned += 1 + + # 2. 關閉 WebSocket 連接 if self.websocket: try: - # 先通知前端超時 + # 根據清理原因發送不同的通知消息 + message_map = { + CleanupReason.TIMEOUT: "會話已超時,介面將自動關閉", + CleanupReason.EXPIRED: "會話已過期,介面將自動關閉", + CleanupReason.MEMORY_PRESSURE: "系統內存不足,會話將被清理", + CleanupReason.MANUAL: "會話已被手動清理", + CleanupReason.ERROR: "會話發生錯誤,將被清理", + CleanupReason.SHUTDOWN: "系統正在關閉,會話將被清理" + } + await self.websocket.send_json({ - "type": "session_timeout", - "message": "會話已超時,介面將自動關閉" + "type": "session_cleanup", + "reason": reason.value, + "message": message_map.get(reason, "會話將被清理") }) await asyncio.sleep(0.1) # 給前端一點時間處理消息 # 安全關閉 WebSocket await self._safe_close_websocket() debug_log(f"會話 {self.session_id} WebSocket 已關閉") + resources_cleaned += 1 except Exception as e: debug_log(f"關閉 WebSocket 時發生錯誤: {e}") finally: self.websocket = None - - # 2. 終止正在運行的命令進程 + + # 3. 終止正在運行的命令進程 if self.process: try: self.process.terminate() @@ -357,67 +550,213 @@ class WebFeedbackSession: except subprocess.TimeoutExpired: self.process.kill() debug_log(f"會話 {self.session_id} 命令進程已強制終止") + resources_cleaned += 1 except Exception as e: debug_log(f"終止命令進程時發生錯誤: {e}") finally: self.process = None - - # 3. 設置完成事件(防止其他地方還在等待) + + # 4. 設置完成事件(防止其他地方還在等待) self.feedback_completed.set() - - # 4. 清理臨時數據 + + # 5. 清理臨時數據 + logs_count = len(self.command_logs) + images_count = len(self.images) + self.command_logs.clear() self.images.clear() - - debug_log(f"會話 {self.session_id} 資源清理完成") - + self.settings.clear() + + if logs_count > 0 or images_count > 0: + resources_cleaned += logs_count + images_count + debug_log(f"清理了 {logs_count} 條日誌和 {images_count} 張圖片") + + # 6. 更新會話狀態 + if reason == CleanupReason.EXPIRED: + self.status = SessionStatus.EXPIRED + elif reason == CleanupReason.TIMEOUT: + self.status = SessionStatus.TIMEOUT + elif reason == CleanupReason.ERROR: + self.status = SessionStatus.ERROR + else: + self.status = SessionStatus.COMPLETED + + # 7. 調用清理回調函數 + for callback in self.cleanup_callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(self, reason) + else: + callback(self, reason) + except Exception as e: + debug_log(f"清理回調執行失敗: {e}") + + # 8. 計算清理效果 + cleanup_duration = time.time() - cleanup_start_time + memory_after = 0 + try: + import psutil + process = psutil.Process() + memory_after = process.memory_info().rss + except: + pass + + memory_freed = max(0, memory_before - memory_after) + + # 更新清理統計 + self.cleanup_stats.update({ + "cleanup_duration": cleanup_duration, + "memory_freed": memory_freed, + "resources_cleaned": resources_cleaned + }) + + debug_log(f"會話 {self.session_id} 資源清理完成,耗時: {cleanup_duration:.2f}秒," + f"清理資源: {resources_cleaned}個,釋放內存: {memory_freed}字節") + except Exception as e: - debug_log(f"清理會話 {self.session_id} 資源時發生錯誤: {e}") + error_id = ErrorHandler.log_error_with_context( + e, + context={ + "session_id": self.session_id, + "cleanup_reason": reason.value, + "operation": "增強資源清理" + }, + error_type=ErrorType.SYSTEM + ) + debug_log(f"清理會話 {self.session_id} 資源時發生錯誤 [錯誤ID: {error_id}]: {e}") + + # 即使發生錯誤也要更新統計 + self.cleanup_stats["cleanup_duration"] = time.time() - cleanup_start_time def _cleanup_sync(self): - """同步清理會話資源(但保留 WebSocket 連接)""" - if self._cleanup_done: + """同步清理會話資源(但保留 WebSocket 連接)- 保持向後兼容""" + self._cleanup_sync_enhanced(CleanupReason.MANUAL, preserve_websocket=True) + + def _cleanup_sync_enhanced(self, reason: CleanupReason, preserve_websocket: bool = False): + """增強的同步清理會話資源""" + if self._cleanup_done and not preserve_websocket: return - debug_log(f"同步清理會話 {self.session_id} 資源(保留 WebSocket)...") + cleanup_start_time = time.time() + debug_log(f"同步清理會話 {self.session_id} 資源,原因: {reason.value},保留WebSocket: {preserve_websocket}") - # 只清理進程,不清理 WebSocket 連接 - if self.process: + # 更新清理統計 + self.cleanup_stats["cleanup_count"] += 1 + self.cleanup_stats["cleanup_reason"] = reason.value + self.cleanup_stats["last_cleanup_time"] = datetime.now().isoformat() + + resources_cleaned = 0 + memory_before = 0 + + try: + # 記錄清理前的內存使用 try: - self.process.terminate() - self.process.wait(timeout=5) + import psutil + process = psutil.Process() + memory_before = process.memory_info().rss except: - try: - self.process.kill() - except: - pass - self.process = None + pass - # 清理臨時數據 - self.command_logs.clear() - # 注意:不設置 _cleanup_done = True,因為還需要清理 WebSocket + # 1. 取消自動清理定時器 + if self.cleanup_timer: + self.cleanup_timer.cancel() + self.cleanup_timer = None + resources_cleaned += 1 + + # 2. 清理進程 + if self.process: + try: + self.process.terminate() + self.process.wait(timeout=5) + debug_log(f"會話 {self.session_id} 命令進程已正常終止") + resources_cleaned += 1 + except: + try: + self.process.kill() + debug_log(f"會話 {self.session_id} 命令進程已強制終止") + resources_cleaned += 1 + except: + pass + self.process = None + + # 3. 清理臨時數據 + logs_count = len(self.command_logs) + images_count = len(self.images) + + self.command_logs.clear() + if not preserve_websocket: + self.images.clear() + self.settings.clear() + resources_cleaned += images_count + + resources_cleaned += logs_count + + # 4. 設置完成事件 + if not preserve_websocket: + self.feedback_completed.set() + + # 5. 更新狀態 + if not preserve_websocket: + if reason == CleanupReason.EXPIRED: + self.status = SessionStatus.EXPIRED + elif reason == CleanupReason.TIMEOUT: + self.status = SessionStatus.TIMEOUT + elif reason == CleanupReason.ERROR: + self.status = SessionStatus.ERROR + else: + self.status = SessionStatus.COMPLETED + + self._cleanup_done = True + + # 6. 調用清理回調函數(同步版本) + for callback in self.cleanup_callbacks: + try: + if not asyncio.iscoroutinefunction(callback): + callback(self, reason) + except Exception as e: + debug_log(f"同步清理回調執行失敗: {e}") + + # 7. 計算清理效果 + cleanup_duration = time.time() - cleanup_start_time + memory_after = 0 + try: + import psutil + process = psutil.Process() + memory_after = process.memory_info().rss + except: + pass + + memory_freed = max(0, memory_before - memory_after) + + # 更新清理統計 + self.cleanup_stats.update({ + "cleanup_duration": cleanup_duration, + "memory_freed": memory_freed, + "resources_cleaned": resources_cleaned + }) + + debug_log(f"會話 {self.session_id} 同步清理完成,耗時: {cleanup_duration:.2f}秒," + f"清理資源: {resources_cleaned}個,釋放內存: {memory_freed}字節") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={ + "session_id": self.session_id, + "cleanup_reason": reason.value, + "preserve_websocket": preserve_websocket, + "operation": "同步資源清理" + }, + error_type=ErrorType.SYSTEM + ) + debug_log(f"同步清理會話 {self.session_id} 資源時發生錯誤 [錯誤ID: {error_id}]: {e}") + + # 即使發生錯誤也要更新統計 + self.cleanup_stats["cleanup_duration"] = time.time() - cleanup_start_time def cleanup(self): """同步清理會話資源(保持向後兼容)""" - if self._cleanup_done: - return - - self._cleanup_done = True - debug_log(f"同步清理會話 {self.session_id} 資源...") - - if self.process: - try: - self.process.terminate() - self.process.wait(timeout=5) - except: - try: - self.process.kill() - except: - pass - self.process = None - - # 設置完成事件 - self.feedback_completed.set() + self._cleanup_sync_enhanced(CleanupReason.MANUAL) async def _safe_close_websocket(self): """安全關閉 WebSocket 連接,避免事件循環衝突""" diff --git a/src/mcp_feedback_enhanced/web/utils/session_cleanup_manager.py b/src/mcp_feedback_enhanced/web/utils/session_cleanup_manager.py new file mode 100644 index 0000000..5534db3 --- /dev/null +++ b/src/mcp_feedback_enhanced/web/utils/session_cleanup_manager.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +會話清理管理器 +============== + +統一管理 Web 會話的清理策略、統計和性能監控。 +與內存監控系統深度集成,提供智能清理決策。 +""" + +import time +import threading +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Callable, Any +from dataclasses import dataclass, field +from enum import Enum + +from ...debug import web_debug_log as debug_log +from ...utils.error_handler import ErrorHandler, ErrorType +from ..models.feedback_session import CleanupReason, SessionStatus + + +@dataclass +class CleanupPolicy: + """清理策略配置""" + max_idle_time: int = 1800 # 最大空閒時間(秒) + max_session_age: int = 7200 # 最大會話年齡(秒) + max_sessions: int = 10 # 最大會話數量 + cleanup_interval: int = 300 # 清理間隔(秒) + memory_pressure_threshold: float = 0.8 # 內存壓力閾值 + enable_auto_cleanup: bool = True # 啟用自動清理 + preserve_active_session: bool = True # 保護活躍會話 + + +@dataclass +class CleanupStats: + """清理統計數據""" + total_cleanups: int = 0 + expired_cleanups: int = 0 + memory_pressure_cleanups: int = 0 + manual_cleanups: int = 0 + auto_cleanups: int = 0 + total_sessions_cleaned: int = 0 + total_cleanup_time: float = 0.0 + average_cleanup_time: float = 0.0 + last_cleanup_time: Optional[datetime] = None + cleanup_efficiency: float = 0.0 # 清理效率(清理的會話數/總會話數) + + +class CleanupTrigger(Enum): + """清理觸發器類型""" + AUTO = "auto" # 自動清理 + MEMORY_PRESSURE = "memory_pressure" # 內存壓力 + MANUAL = "manual" # 手動清理 + EXPIRED = "expired" # 過期清理 + CAPACITY = "capacity" # 容量限制 + + +class SessionCleanupManager: + """會話清理管理器""" + + def __init__(self, web_ui_manager, policy: CleanupPolicy = None): + """ + 初始化會話清理管理器 + + Args: + web_ui_manager: WebUIManager 實例 + policy: 清理策略配置 + """ + self.web_ui_manager = web_ui_manager + self.policy = policy or CleanupPolicy() + self.stats = CleanupStats() + + # 清理狀態 + self.is_running = False + self.cleanup_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # 回調函數 + self.cleanup_callbacks: List[Callable] = [] + self.stats_callbacks: List[Callable] = [] + + # 清理歷史記錄 + self.cleanup_history: List[Dict[str, Any]] = [] + self.max_history = 100 + + debug_log("SessionCleanupManager 初始化完成") + + def start_auto_cleanup(self) -> bool: + """啟動自動清理""" + if not self.policy.enable_auto_cleanup: + debug_log("自動清理已禁用") + return False + + if self.is_running: + debug_log("自動清理已在運行") + return True + + try: + self.is_running = True + self._stop_event.clear() + + self.cleanup_thread = threading.Thread( + target=self._auto_cleanup_loop, + name="SessionCleanupManager", + daemon=True + ) + self.cleanup_thread.start() + + debug_log(f"自動清理已啟動,間隔 {self.policy.cleanup_interval} 秒") + return True + + except Exception as e: + self.is_running = False + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "啟動自動清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"啟動自動清理失敗 [錯誤ID: {error_id}]: {e}") + return False + + def stop_auto_cleanup(self) -> bool: + """停止自動清理""" + if not self.is_running: + debug_log("自動清理未在運行") + return True + + try: + self.is_running = False + self._stop_event.set() + + if self.cleanup_thread and self.cleanup_thread.is_alive(): + self.cleanup_thread.join(timeout=5) + + debug_log("自動清理已停止") + return True + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "停止自動清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"停止自動清理失敗 [錯誤ID: {error_id}]: {e}") + return False + + def _auto_cleanup_loop(self): + """自動清理主循環""" + debug_log("自動清理循環開始") + + while not self._stop_event.is_set(): + try: + # 執行清理檢查 + self._perform_auto_cleanup() + + # 等待下次清理 + if self._stop_event.wait(self.policy.cleanup_interval): + break + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "自動清理循環"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"自動清理循環錯誤 [錯誤ID: {error_id}]: {e}") + + # 發生錯誤時等待較短時間後重試 + if self._stop_event.wait(30): + break + + debug_log("自動清理循環結束") + + def _perform_auto_cleanup(self): + """執行自動清理""" + cleanup_start_time = time.time() + cleaned_sessions = 0 + + try: + # 1. 檢查會話數量限制 + if len(self.web_ui_manager.sessions) > self.policy.max_sessions: + cleaned = self._cleanup_by_capacity() + cleaned_sessions += cleaned + debug_log(f"容量限制清理了 {cleaned} 個會話") + + # 2. 清理過期會話 + cleaned = self._cleanup_expired_sessions() + cleaned_sessions += cleaned + + # 3. 清理空閒會話 + cleaned = self._cleanup_idle_sessions() + cleaned_sessions += cleaned + + # 4. 更新統計 + cleanup_duration = time.time() - cleanup_start_time + self._update_cleanup_stats( + CleanupTrigger.AUTO, + cleaned_sessions, + cleanup_duration + ) + + if cleaned_sessions > 0: + debug_log(f"自動清理完成,清理了 {cleaned_sessions} 個會話,耗時: {cleanup_duration:.2f}秒") + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "執行自動清理"}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"執行自動清理失敗 [錯誤ID: {error_id}]: {e}") + + def trigger_cleanup(self, trigger: CleanupTrigger, force: bool = False) -> int: + """觸發清理操作""" + cleanup_start_time = time.time() + cleaned_sessions = 0 + + try: + debug_log(f"觸發清理操作,觸發器: {trigger.value},強制: {force}") + + if trigger == CleanupTrigger.MEMORY_PRESSURE: + cleaned_sessions = self.web_ui_manager.cleanup_sessions_by_memory_pressure(force) + elif trigger == CleanupTrigger.EXPIRED: + cleaned_sessions = self.web_ui_manager.cleanup_expired_sessions() + elif trigger == CleanupTrigger.CAPACITY: + cleaned_sessions = self._cleanup_by_capacity() + elif trigger == CleanupTrigger.MANUAL: + # 手動清理:組合多種策略 + cleaned_sessions += self.web_ui_manager.cleanup_expired_sessions() + if force: + cleaned_sessions += self.web_ui_manager.cleanup_sessions_by_memory_pressure(force) + else: + # 自動清理 + self._perform_auto_cleanup() + return 0 # 統計已在 _perform_auto_cleanup 中更新 + + # 更新統計 + cleanup_duration = time.time() - cleanup_start_time + self._update_cleanup_stats(trigger, cleaned_sessions, cleanup_duration) + + debug_log(f"清理操作完成,清理了 {cleaned_sessions} 個會話,耗時: {cleanup_duration:.2f}秒") + return cleaned_sessions + + except Exception as e: + error_id = ErrorHandler.log_error_with_context( + e, + context={"operation": "觸發清理", "trigger": trigger.value, "force": force}, + error_type=ErrorType.SYSTEM + ) + debug_log(f"觸發清理操作失敗 [錯誤ID: {error_id}]: {e}") + return 0 + + def _cleanup_by_capacity(self) -> int: + """根據容量限制清理會話""" + sessions = self.web_ui_manager.sessions + if len(sessions) <= self.policy.max_sessions: + return 0 + + # 計算需要清理的會話數量 + excess_count = len(sessions) - self.policy.max_sessions + + # 按優先級排序會話(優先清理舊的、非活躍的會話) + session_priorities = [] + for session_id, session in sessions.items(): + # 跳過當前活躍會話(如果啟用保護) + if (self.policy.preserve_active_session and + self.web_ui_manager.current_session and + session.session_id == self.web_ui_manager.current_session.session_id): + continue + + # 計算優先級分數(分數越高越優先清理) + priority_score = 0 + + # 狀態優先級 + if session.status in [SessionStatus.COMPLETED, SessionStatus.ERROR, SessionStatus.TIMEOUT]: + priority_score += 100 + elif session.status == SessionStatus.FEEDBACK_SUBMITTED: + priority_score += 50 + + # 年齡優先級 + age = session.get_age() + priority_score += age / 60 # 每分鐘加1分 + + # 空閒時間優先級 + idle_time = session.get_idle_time() + priority_score += idle_time / 30 # 每30秒加1分 + + session_priorities.append((session_id, session, priority_score)) + + # 按優先級排序並清理 + session_priorities.sort(key=lambda x: x[2], reverse=True) + cleaned_count = 0 + + for i in range(min(excess_count, len(session_priorities))): + session_id, session, _ = session_priorities[i] + try: + session._cleanup_sync_enhanced(CleanupReason.MANUAL) + del self.web_ui_manager.sessions[session_id] + cleaned_count += 1 + except Exception as e: + debug_log(f"容量清理會話 {session_id} 失敗: {e}") + + return cleaned_count + + def _cleanup_expired_sessions(self) -> int: + """清理過期會話""" + expired_sessions = [] + current_time = time.time() + + for session_id, session in self.web_ui_manager.sessions.items(): + # 檢查是否過期 + if session.is_expired(): + expired_sessions.append(session_id) + # 檢查是否超過最大年齡 + elif session.get_age() > self.policy.max_session_age: + expired_sessions.append(session_id) + + # 清理過期會話 + cleaned_count = 0 + for session_id in expired_sessions: + try: + session = self.web_ui_manager.sessions.get(session_id) + if session: + session._cleanup_sync_enhanced(CleanupReason.EXPIRED) + del self.web_ui_manager.sessions[session_id] + cleaned_count += 1 + + # 如果清理的是當前活躍會話,清空當前會話 + if (self.web_ui_manager.current_session and + self.web_ui_manager.current_session.session_id == session_id): + self.web_ui_manager.current_session = None + + except Exception as e: + debug_log(f"清理過期會話 {session_id} 失敗: {e}") + + return cleaned_count + + def _cleanup_idle_sessions(self) -> int: + """清理空閒會話""" + idle_sessions = [] + + for session_id, session in self.web_ui_manager.sessions.items(): + # 跳過當前活躍會話(如果啟用保護) + if (self.policy.preserve_active_session and + self.web_ui_manager.current_session and + session.session_id == self.web_ui_manager.current_session.session_id): + continue + + # 檢查是否空閒時間過長 + if session.get_idle_time() > self.policy.max_idle_time: + idle_sessions.append(session_id) + + # 清理空閒會話 + cleaned_count = 0 + for session_id in idle_sessions: + try: + session = self.web_ui_manager.sessions.get(session_id) + if session: + session._cleanup_sync_enhanced(CleanupReason.EXPIRED) + del self.web_ui_manager.sessions[session_id] + cleaned_count += 1 + + except Exception as e: + debug_log(f"清理空閒會話 {session_id} 失敗: {e}") + + return cleaned_count + + def _update_cleanup_stats(self, trigger: CleanupTrigger, cleaned_count: int, duration: float): + """更新清理統計""" + self.stats.total_cleanups += 1 + self.stats.total_sessions_cleaned += cleaned_count + self.stats.total_cleanup_time += duration + self.stats.last_cleanup_time = datetime.now() + + # 更新平均清理時間 + if self.stats.total_cleanups > 0: + self.stats.average_cleanup_time = self.stats.total_cleanup_time / self.stats.total_cleanups + + # 更新清理效率 + total_sessions = len(self.web_ui_manager.sessions) + cleaned_count + if total_sessions > 0: + self.stats.cleanup_efficiency = cleaned_count / total_sessions + + # 根據觸發器類型更新統計 + if trigger == CleanupTrigger.AUTO: + self.stats.auto_cleanups += 1 + elif trigger == CleanupTrigger.MEMORY_PRESSURE: + self.stats.memory_pressure_cleanups += 1 + elif trigger == CleanupTrigger.EXPIRED: + self.stats.expired_cleanups += 1 + elif trigger == CleanupTrigger.MANUAL: + self.stats.manual_cleanups += 1 + + # 記錄清理歷史 + cleanup_record = { + "timestamp": datetime.now().isoformat(), + "trigger": trigger.value, + "cleaned_count": cleaned_count, + "duration": duration, + "total_sessions_before": total_sessions, + "total_sessions_after": len(self.web_ui_manager.sessions) + } + + self.cleanup_history.append(cleanup_record) + + # 限制歷史記錄數量 + if len(self.cleanup_history) > self.max_history: + self.cleanup_history = self.cleanup_history[-self.max_history:] + + # 調用統計回調 + for callback in self.stats_callbacks: + try: + callback(self.stats, cleanup_record) + except Exception as e: + debug_log(f"統計回調執行失敗: {e}") + + def get_cleanup_statistics(self) -> Dict[str, Any]: + """獲取清理統計數據""" + stats_dict = { + "total_cleanups": self.stats.total_cleanups, + "expired_cleanups": self.stats.expired_cleanups, + "memory_pressure_cleanups": self.stats.memory_pressure_cleanups, + "manual_cleanups": self.stats.manual_cleanups, + "auto_cleanups": self.stats.auto_cleanups, + "total_sessions_cleaned": self.stats.total_sessions_cleaned, + "total_cleanup_time": round(self.stats.total_cleanup_time, 2), + "average_cleanup_time": round(self.stats.average_cleanup_time, 2), + "cleanup_efficiency": round(self.stats.cleanup_efficiency, 3), + "last_cleanup_time": self.stats.last_cleanup_time.isoformat() if self.stats.last_cleanup_time else None, + "is_auto_cleanup_running": self.is_running, + "current_sessions": len(self.web_ui_manager.sessions), + "policy": { + "max_idle_time": self.policy.max_idle_time, + "max_session_age": self.policy.max_session_age, + "max_sessions": self.policy.max_sessions, + "cleanup_interval": self.policy.cleanup_interval, + "enable_auto_cleanup": self.policy.enable_auto_cleanup, + "preserve_active_session": self.policy.preserve_active_session + } + } + + return stats_dict + + def get_cleanup_history(self, limit: int = 20) -> List[Dict[str, Any]]: + """獲取清理歷史記錄""" + return self.cleanup_history[-limit:] if self.cleanup_history else [] + + def add_cleanup_callback(self, callback: Callable): + """添加清理回調函數""" + if callback not in self.cleanup_callbacks: + self.cleanup_callbacks.append(callback) + debug_log("添加清理回調函數") + + def add_stats_callback(self, callback: Callable): + """添加統計回調函數""" + if callback not in self.stats_callbacks: + self.stats_callbacks.append(callback) + debug_log("添加統計回調函數") + + def update_policy(self, **kwargs): + """更新清理策略""" + for key, value in kwargs.items(): + if hasattr(self.policy, key): + setattr(self.policy, key, value) + debug_log(f"更新清理策略 {key} = {value}") + else: + debug_log(f"未知的策略參數: {key}") + + def reset_stats(self): + """重置統計數據""" + self.stats = CleanupStats() + self.cleanup_history.clear() + debug_log("清理統計數據已重置") + + def force_cleanup_all(self, exclude_current: bool = True) -> int: + """強制清理所有會話""" + sessions_to_clean = [] + + for session_id, session in self.web_ui_manager.sessions.items(): + # 是否排除當前活躍會話 + if (exclude_current and + self.web_ui_manager.current_session and + session.session_id == self.web_ui_manager.current_session.session_id): + continue + sessions_to_clean.append(session_id) + + # 清理會話 + cleaned_count = 0 + for session_id in sessions_to_clean: + try: + session = self.web_ui_manager.sessions.get(session_id) + if session: + session._cleanup_sync_enhanced(CleanupReason.MANUAL) + del self.web_ui_manager.sessions[session_id] + cleaned_count += 1 + except Exception as e: + debug_log(f"強制清理會話 {session_id} 失敗: {e}") + + # 更新統計 + self._update_cleanup_stats(CleanupTrigger.MANUAL, cleaned_count, 0.0) + + debug_log(f"強制清理完成,清理了 {cleaned_count} 個會話") + return cleaned_count diff --git a/tests/test_session_cleanup.py b/tests/test_session_cleanup.py new file mode 100644 index 0000000..96900ca --- /dev/null +++ b/tests/test_session_cleanup.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +會話清理優化測試 +================ + +測試 WebFeedbackSession 和 SessionCleanupManager 的清理功能。 +""" + +import asyncio +import pytest +import time +import threading +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime, timedelta + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from src.mcp_feedback_enhanced.web.models.feedback_session import ( + WebFeedbackSession, SessionStatus, CleanupReason +) +from src.mcp_feedback_enhanced.web.utils.session_cleanup_manager import ( + SessionCleanupManager, CleanupPolicy, CleanupTrigger +) + + +class TestWebFeedbackSessionCleanup: + """測試 WebFeedbackSession 清理功能""" + + def setup_method(self): + """測試前設置""" + self.session_id = "test_session_001" + self.project_dir = "/tmp/test_project" + self.summary = "測試會話摘要" + + # 創建測試會話 + self.session = WebFeedbackSession( + self.session_id, + self.project_dir, + self.summary, + auto_cleanup_delay=60, # 1分鐘自動清理 + max_idle_time=30 # 30秒最大空閒時間 + ) + + def teardown_method(self): + """測試後清理""" + if hasattr(self, 'session') and self.session: + try: + self.session._cleanup_sync_enhanced(CleanupReason.MANUAL) + except: + pass + + def test_session_initialization(self): + """測試會話初始化""" + assert self.session.session_id == self.session_id + assert self.session.project_directory == self.project_dir + assert self.session.summary == self.summary + assert self.session.status == SessionStatus.WAITING + assert self.session.auto_cleanup_delay == 60 + assert self.session.max_idle_time == 30 + assert self.session.cleanup_timer is not None + assert len(self.session.cleanup_stats) > 0 + + def test_is_expired_by_idle_time(self): + """測試空閒時間過期檢測""" + # 新創建的會話不應該過期 + assert not self.session.is_expired() + + # 模擬空閒時間過長 + self.session.last_activity = time.time() - 40 # 40秒前 + assert self.session.is_expired() + + def test_is_expired_by_status(self): + """測試狀態過期檢測""" + # 設置為錯誤狀態 + self.session.status = SessionStatus.ERROR + self.session.last_activity = time.time() - 400 # 400秒前 + assert self.session.is_expired() + + # 設置為已過期狀態 + self.session.status = SessionStatus.EXPIRED + assert self.session.is_expired() + + def test_get_age_and_idle_time(self): + """測試年齡和空閒時間計算""" + # 測試年齡 + age = self.session.get_age() + assert age >= 0 + assert age < 1 # 剛創建,應該小於1秒 + + # 測試空閒時間 + idle_time = self.session.get_idle_time() + assert idle_time >= 0 + assert idle_time < 1 # 剛創建,應該小於1秒 + + def test_cleanup_timer_scheduling(self): + """測試清理定時器調度""" + # 檢查定時器是否已設置 + assert self.session.cleanup_timer is not None + assert self.session.cleanup_timer.is_alive() + + # 測試延長定時器 + old_timer = self.session.cleanup_timer + self.session.extend_cleanup_timer(120) + + # 應該創建新的定時器 + assert self.session.cleanup_timer != old_timer + assert self.session.cleanup_timer.is_alive() + + def test_cleanup_callbacks(self): + """測試清理回調函數""" + callback_called = False + callback_session = None + callback_reason = None + + def test_callback(session, reason): + nonlocal callback_called, callback_session, callback_reason + callback_called = True + callback_session = session + callback_reason = reason + + # 添加回調 + self.session.add_cleanup_callback(test_callback) + assert len(self.session.cleanup_callbacks) == 1 + + # 執行清理 + self.session._cleanup_sync_enhanced(CleanupReason.MANUAL) + + # 檢查回調是否被調用 + assert callback_called + assert callback_session == self.session + assert callback_reason == CleanupReason.MANUAL + + # 移除回調 + self.session.remove_cleanup_callback(test_callback) + assert len(self.session.cleanup_callbacks) == 0 + + def test_cleanup_stats(self): + """測試清理統計""" + # 初始統計 + stats = self.session.get_cleanup_stats() + assert stats["cleanup_count"] == 0 + assert stats["session_id"] == self.session_id + assert stats["is_active"] == True + + # 執行清理 + self.session._cleanup_sync_enhanced(CleanupReason.EXPIRED) + + # 檢查統計更新 + stats = self.session.get_cleanup_stats() + assert stats["cleanup_count"] == 1 + assert stats["cleanup_reason"] == CleanupReason.EXPIRED.value + assert stats["last_cleanup_time"] is not None + assert stats["cleanup_duration"] >= 0 + + @pytest.mark.asyncio + async def test_async_cleanup(self): + """測試異步清理""" + # 模擬 WebSocket 連接 + mock_websocket = Mock() + mock_websocket.send_json = Mock(return_value=asyncio.Future()) + mock_websocket.send_json.return_value.set_result(None) + mock_websocket.close = Mock(return_value=asyncio.Future()) + mock_websocket.close.return_value.set_result(None) + mock_websocket.client_state.DISCONNECTED = False + + self.session.websocket = mock_websocket + + # 執行異步清理 + await self.session._cleanup_resources_enhanced(CleanupReason.TIMEOUT) + + # 檢查 WebSocket 是否被正確處理 + mock_websocket.send_json.assert_called_once() + + # 檢查清理統計 + stats = self.session.get_cleanup_stats() + assert stats["cleanup_count"] == 1 + assert stats["cleanup_reason"] == CleanupReason.TIMEOUT.value + + def test_status_update_resets_timer(self): + """測試狀態更新重置定時器""" + old_timer = self.session.cleanup_timer + + # 更新狀態為活躍 + self.session.update_status(SessionStatus.ACTIVE, "測試活躍狀態") + + # 檢查定時器是否被重置 + assert self.session.cleanup_timer != old_timer + assert self.session.cleanup_timer.is_alive() + assert self.session.status == SessionStatus.ACTIVE + + +class TestSessionCleanupManager: + """測試 SessionCleanupManager 功能""" + + def setup_method(self): + """測試前設置""" + # 創建模擬的 WebUIManager + self.mock_web_ui_manager = Mock() + self.mock_web_ui_manager.sessions = {} + self.mock_web_ui_manager.current_session = None + self.mock_web_ui_manager.cleanup_expired_sessions = Mock(return_value=0) + self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure = Mock(return_value=0) + + # 創建清理策略 + self.policy = CleanupPolicy( + max_idle_time=30, + max_session_age=300, + max_sessions=5, + cleanup_interval=10, + enable_auto_cleanup=True + ) + + # 創建清理管理器 + self.cleanup_manager = SessionCleanupManager( + self.mock_web_ui_manager, + self.policy + ) + + def teardown_method(self): + """測試後清理""" + if hasattr(self, 'cleanup_manager'): + self.cleanup_manager.stop_auto_cleanup() + + def test_cleanup_manager_initialization(self): + """測試清理管理器初始化""" + assert self.cleanup_manager.web_ui_manager == self.mock_web_ui_manager + assert self.cleanup_manager.policy == self.policy + assert not self.cleanup_manager.is_running + assert self.cleanup_manager.cleanup_thread is None + assert len(self.cleanup_manager.cleanup_callbacks) == 0 + assert len(self.cleanup_manager.cleanup_history) == 0 + + def test_auto_cleanup_start_stop(self): + """測試自動清理啟動和停止""" + # 啟動自動清理 + result = self.cleanup_manager.start_auto_cleanup() + assert result == True + assert self.cleanup_manager.is_running == True + assert self.cleanup_manager.cleanup_thread is not None + assert self.cleanup_manager.cleanup_thread.is_alive() + + # 停止自動清理 + result = self.cleanup_manager.stop_auto_cleanup() + assert result == True + assert self.cleanup_manager.is_running == False + + def test_trigger_cleanup_memory_pressure(self): + """測試內存壓力清理觸發""" + # 設置模擬返回值 + self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.return_value = 3 + + # 觸發內存壓力清理 + cleaned = self.cleanup_manager.trigger_cleanup(CleanupTrigger.MEMORY_PRESSURE, force=True) + + # 檢查結果 + assert cleaned == 3 + self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.assert_called_once_with(True) + + # 檢查統計更新 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] == 1 + assert stats["memory_pressure_cleanups"] == 1 + assert stats["total_sessions_cleaned"] == 3 + + def test_trigger_cleanup_expired(self): + """測試過期清理觸發""" + # 設置模擬返回值 + self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 2 + + # 觸發過期清理 + cleaned = self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED) + + # 檢查結果 + assert cleaned == 2 + self.mock_web_ui_manager.cleanup_expired_sessions.assert_called_once() + + # 檢查統計更新 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] == 1 + assert stats["expired_cleanups"] == 1 + assert stats["total_sessions_cleaned"] == 2 + + def test_cleanup_statistics(self): + """測試清理統計功能""" + # 初始統計 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] == 0 + assert stats["total_sessions_cleaned"] == 0 + assert stats["is_auto_cleanup_running"] == False + + # 執行一些清理操作 + self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1 + self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED) + + self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.return_value = 2 + self.cleanup_manager.trigger_cleanup(CleanupTrigger.MEMORY_PRESSURE) + + # 檢查統計 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] == 2 + assert stats["expired_cleanups"] == 1 + assert stats["memory_pressure_cleanups"] == 1 + assert stats["total_sessions_cleaned"] == 3 + assert stats["average_cleanup_time"] >= 0 + + def test_cleanup_history(self): + """測試清理歷史記錄""" + # 初始歷史為空 + history = self.cleanup_manager.get_cleanup_history() + assert len(history) == 0 + + # 執行清理操作 + self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1 + self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED) + + # 檢查歷史記錄 + history = self.cleanup_manager.get_cleanup_history() + assert len(history) == 1 + + record = history[0] + assert record["trigger"] == CleanupTrigger.EXPIRED.value + assert record["cleaned_count"] == 1 + assert "timestamp" in record + assert "duration" in record + + def test_policy_update(self): + """測試策略更新""" + # 更新策略 + self.cleanup_manager.update_policy( + max_idle_time=60, + max_sessions=10, + enable_auto_cleanup=False + ) + + # 檢查策略是否更新 + assert self.cleanup_manager.policy.max_idle_time == 60 + assert self.cleanup_manager.policy.max_sessions == 10 + assert self.cleanup_manager.policy.enable_auto_cleanup == False + + def test_stats_reset(self): + """測試統計重置""" + # 執行一些操作產生統計 + self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1 + self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED) + + # 檢查有統計數據 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] > 0 + + # 重置統計 + self.cleanup_manager.reset_stats() + + # 檢查統計已重置 + stats = self.cleanup_manager.get_cleanup_statistics() + assert stats["total_cleanups"] == 0 + assert stats["total_sessions_cleaned"] == 0 + + history = self.cleanup_manager.get_cleanup_history() + assert len(history) == 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])