新增會話清理管理器,整合自動清理策略、統計與性能監控,提升系統資源管理效率。擴展 WebFeedbackSession 及 SessionCleanupManager 功能,支持過期與內存壓力清理,並新增測試模組以確保功能正確性。

This commit is contained in:
Minidoracat 2025-06-07 02:54:46 +08:00
parent 90deebdee5
commit b9d781f147
5 changed files with 1481 additions and 65 deletions

View File

@ -17,7 +17,8 @@ import threading
import time
import webbrowser
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, Optional, List
from datetime import datetime
import uuid
from fastapi import FastAPI, Request, Response
@ -26,7 +27,7 @@ from fastapi.templating import Jinja2Templates
from fastapi.middleware.gzip import GZipMiddleware
import uvicorn
from .models import WebFeedbackSession, FeedbackResult
from .models import WebFeedbackSession, FeedbackResult, CleanupReason, SessionStatus
from .routes import setup_routes
from .utils import find_free_port, get_browser_opener
from .utils.port_manager import PortManager
@ -85,6 +86,17 @@ class WebUIManager:
# 會話更新通知標記
self._pending_session_update = False
# 會話清理統計
self.cleanup_stats = {
"total_cleanups": 0,
"expired_cleanups": 0,
"memory_pressure_cleanups": 0,
"manual_cleanups": 0,
"last_cleanup_time": None,
"total_cleanup_duration": 0.0,
"sessions_cleaned": 0
}
self.server_thread = None
self.server_process = None
self.i18n = get_i18n_manager()
@ -148,16 +160,46 @@ class WebUIManager:
# 添加 Web 應用特定的警告回調
def web_memory_alert(alert):
debug_log(f"Web UI 內存警告 [{alert.level}]: {alert.message}")
# 可以在這裡添加更多 Web 特定的處理邏輯
# 例如:通過 WebSocket 通知前端、記錄到特定日誌等
# 根據警告級別觸發不同的清理策略
if alert.level == "critical":
# 危險級別:清理過期會話
cleaned = self.cleanup_expired_sessions()
debug_log(f"內存危險警告觸發,清理了 {cleaned} 個過期會話")
elif alert.level == "emergency":
# 緊急級別:強制清理會話
cleaned = self.cleanup_sessions_by_memory_pressure(force=True)
debug_log(f"內存緊急警告觸發,強制清理了 {cleaned} 個會話")
self.memory_monitor.add_alert_callback(web_memory_alert)
# 添加會話清理回調到內存監控
def session_cleanup_callback(force: bool = False):
"""內存監控觸發的會話清理回調"""
try:
if force:
# 強制清理:包括內存壓力清理
cleaned = self.cleanup_sessions_by_memory_pressure(force=True)
debug_log(f"內存監控強制清理了 {cleaned} 個會話")
else:
# 常規清理:只清理過期會話
cleaned = self.cleanup_expired_sessions()
debug_log(f"內存監控清理了 {cleaned} 個過期會話")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "內存監控會話清理", "force": force},
error_type=ErrorType.SYSTEM
)
debug_log(f"內存監控會話清理失敗 [錯誤ID: {error_id}]: {e}")
self.memory_monitor.add_cleanup_callback(session_cleanup_callback)
# 確保內存監控已啟動ResourceManager 可能已經啟動了)
if not self.memory_monitor.is_monitoring:
self.memory_monitor.start_monitoring()
debug_log("Web UI 內存監控設置完成")
debug_log("Web UI 內存監控設置完成,已集成會話清理回調")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
@ -563,12 +605,175 @@ class WebUIManager:
"""獲取伺服器 URL"""
return f"http://{self.host}:{self.port}"
def cleanup_expired_sessions(self) -> int:
"""清理過期會話"""
cleanup_start_time = time.time()
expired_sessions = []
# 掃描過期會話
for session_id, session in self.sessions.items():
if session.is_expired():
expired_sessions.append(session_id)
# 批量清理過期會話
cleaned_count = 0
for session_id in expired_sessions:
try:
session = self.sessions.get(session_id)
if session:
# 使用增強清理方法
session._cleanup_sync_enhanced(CleanupReason.EXPIRED)
del self.sessions[session_id]
cleaned_count += 1
# 如果清理的是當前活躍會話,清空當前會話
if self.current_session and self.current_session.session_id == session_id:
self.current_session = None
debug_log("清空過期的當前活躍會話")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"session_id": session_id, "operation": "清理過期會話"},
error_type=ErrorType.SYSTEM
)
debug_log(f"清理過期會話 {session_id} 失敗 [錯誤ID: {error_id}]: {e}")
# 更新統計
cleanup_duration = time.time() - cleanup_start_time
self.cleanup_stats.update({
"total_cleanups": self.cleanup_stats["total_cleanups"] + 1,
"expired_cleanups": self.cleanup_stats["expired_cleanups"] + 1,
"last_cleanup_time": datetime.now().isoformat(),
"total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration,
"sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + cleaned_count
})
if cleaned_count > 0:
debug_log(f"清理了 {cleaned_count} 個過期會話,耗時: {cleanup_duration:.2f}")
return cleaned_count
def cleanup_sessions_by_memory_pressure(self, force: bool = False) -> int:
"""根據內存壓力清理會話"""
cleanup_start_time = time.time()
sessions_to_clean = []
# 根據優先級選擇要清理的會話
# 優先級:已完成 > 已提交反饋 > 錯誤狀態 > 空閒時間最長
for session_id, session in self.sessions.items():
# 跳過當前活躍會話(除非強制清理)
if not force and self.current_session and session.session_id == self.current_session.session_id:
continue
# 優先清理已完成或錯誤狀態的會話
if session.status in [SessionStatus.COMPLETED, SessionStatus.ERROR, SessionStatus.TIMEOUT]:
sessions_to_clean.append((session_id, session, 1)) # 高優先級
elif session.status == SessionStatus.FEEDBACK_SUBMITTED:
# 已提交反饋但空閒時間較長的會話
if session.get_idle_time() > 300: # 5分鐘空閒
sessions_to_clean.append((session_id, session, 2)) # 中優先級
elif session.get_idle_time() > 600: # 10分鐘空閒
sessions_to_clean.append((session_id, session, 3)) # 低優先級
# 按優先級排序
sessions_to_clean.sort(key=lambda x: x[2])
# 清理會話(限制數量避免過度清理)
max_cleanup = min(len(sessions_to_clean), 5 if not force else len(sessions_to_clean))
cleaned_count = 0
for i in range(max_cleanup):
session_id, session, priority = sessions_to_clean[i]
try:
# 使用增強清理方法
session._cleanup_sync_enhanced(CleanupReason.MEMORY_PRESSURE)
del self.sessions[session_id]
cleaned_count += 1
# 如果清理的是當前活躍會話,清空當前會話
if self.current_session and self.current_session.session_id == session_id:
self.current_session = None
debug_log("因內存壓力清空當前活躍會話")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"session_id": session_id, "operation": "內存壓力清理"},
error_type=ErrorType.SYSTEM
)
debug_log(f"內存壓力清理會話 {session_id} 失敗 [錯誤ID: {error_id}]: {e}")
# 更新統計
cleanup_duration = time.time() - cleanup_start_time
self.cleanup_stats.update({
"total_cleanups": self.cleanup_stats["total_cleanups"] + 1,
"memory_pressure_cleanups": self.cleanup_stats["memory_pressure_cleanups"] + 1,
"last_cleanup_time": datetime.now().isoformat(),
"total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration,
"sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + cleaned_count
})
if cleaned_count > 0:
debug_log(f"因內存壓力清理了 {cleaned_count} 個會話,耗時: {cleanup_duration:.2f}")
return cleaned_count
def get_session_cleanup_stats(self) -> dict:
"""獲取會話清理統計"""
stats = self.cleanup_stats.copy()
stats.update({
"active_sessions": len(self.sessions),
"current_session_id": self.current_session.session_id if self.current_session else None,
"expired_sessions": sum(1 for s in self.sessions.values() if s.is_expired()),
"idle_sessions": sum(1 for s in self.sessions.values() if s.get_idle_time() > 300),
"memory_usage_mb": 0 # 將在下面計算
})
# 計算內存使用(如果可能)
try:
import psutil
process = psutil.Process()
stats["memory_usage_mb"] = round(process.memory_info().rss / (1024 * 1024), 2)
except:
pass
return stats
def _scan_expired_sessions(self) -> List[str]:
"""掃描過期會話ID列表"""
expired_sessions = []
for session_id, session in self.sessions.items():
if session.is_expired():
expired_sessions.append(session_id)
return expired_sessions
def stop(self):
"""停止 Web UI 服務"""
# 清理所有會話
cleanup_start_time = time.time()
session_count = len(self.sessions)
for session in list(self.sessions.values()):
session.cleanup()
try:
session._cleanup_sync_enhanced(CleanupReason.SHUTDOWN)
except Exception as e:
debug_log(f"停止服務時清理會話失敗: {e}")
self.sessions.clear()
self.current_session = None
# 更新統計
cleanup_duration = time.time() - cleanup_start_time
self.cleanup_stats.update({
"total_cleanups": self.cleanup_stats["total_cleanups"] + 1,
"manual_cleanups": self.cleanup_stats["manual_cleanups"] + 1,
"last_cleanup_time": datetime.now().isoformat(),
"total_cleanup_duration": self.cleanup_stats["total_cleanup_duration"] + cleanup_duration,
"sessions_cleaned": self.cleanup_stats["sessions_cleaned"] + session_count
})
debug_log(f"停止服務時清理了 {session_count} 個會話,耗時: {cleanup_duration:.2f}")
# 停止伺服器注意uvicorn 的 graceful shutdown 需要額外處理)
if self.server_thread and self.server_thread.is_alive():

View File

@ -7,10 +7,12 @@ Web UI 資料模型模組
定義 Web UI 相關的資料結構和型別
"""
from .feedback_session import WebFeedbackSession
from .feedback_session import WebFeedbackSession, SessionStatus, CleanupReason
from .feedback_result import FeedbackResult
__all__ = [
'WebFeedbackSession',
'SessionStatus',
'CleanupReason',
'FeedbackResult'
]

View File

@ -11,14 +11,17 @@ import asyncio
import base64
import subprocess
import threading
import time
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Callable
from fastapi import WebSocket
from ...debug import web_debug_log as debug_log
from ...utils.resource_manager import get_resource_manager, register_process
from ...utils.error_handler import ErrorHandler, ErrorType
class SessionStatus(Enum):
@ -29,6 +32,17 @@ class SessionStatus(Enum):
COMPLETED = "completed" # 已完成
TIMEOUT = "timeout" # 超時
ERROR = "error" # 錯誤
EXPIRED = "expired" # 已過期
class CleanupReason(Enum):
"""清理原因枚舉"""
TIMEOUT = "timeout" # 超時清理
EXPIRED = "expired" # 過期清理
MEMORY_PRESSURE = "memory_pressure" # 內存壓力清理
MANUAL = "manual" # 手動清理
ERROR = "error" # 錯誤清理
SHUTDOWN = "shutdown" # 系統關閉清理
# 常數定義
MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB 圖片大小限制
@ -39,7 +53,8 @@ TEMP_DIR = Path.home() / ".cache" / "interactive-feedback-mcp-web"
class WebFeedbackSession:
"""Web 回饋會話管理"""
def __init__(self, session_id: str, project_directory: str, summary: str):
def __init__(self, session_id: str, project_directory: str, summary: str,
auto_cleanup_delay: int = 3600, max_idle_time: int = 1800):
self.session_id = session_id
self.project_directory = project_directory
self.summary = summary
@ -55,21 +70,49 @@ class WebFeedbackSession:
# 新增:會話狀態管理
self.status = SessionStatus.WAITING
self.status_message = "等待用戶回饋"
self.created_at = asyncio.get_event_loop().time()
# 統一使用 time.time() 以避免時間基準不一致
self.created_at = time.time()
self.last_activity = self.created_at
# 新增:自動清理配置
self.auto_cleanup_delay = auto_cleanup_delay # 自動清理延遲時間(秒)
self.max_idle_time = max_idle_time # 最大空閒時間(秒)
self.cleanup_timer: Optional[threading.Timer] = None
self.cleanup_callbacks: List[Callable] = [] # 清理回調函數列表
# 新增:清理統計
self.cleanup_stats = {
"cleanup_count": 0,
"last_cleanup_time": None,
"cleanup_reason": None,
"cleanup_duration": 0.0,
"memory_freed": 0,
"resources_cleaned": 0
}
# 確保臨時目錄存在
TEMP_DIR.mkdir(parents=True, exist_ok=True)
# 獲取資源管理器實例
self.resource_manager = get_resource_manager()
# 啟動自動清理定時器
self._schedule_auto_cleanup()
debug_log(f"會話 {self.session_id} 初始化完成,自動清理延遲: {auto_cleanup_delay}秒,最大空閒: {max_idle_time}")
def update_status(self, status: SessionStatus, message: str = None):
"""更新會話狀態"""
self.status = status
if message:
self.status_message = message
self.last_activity = asyncio.get_event_loop().time()
# 統一使用 time.time()
self.last_activity = time.time()
# 如果會話變為活躍狀態,重置清理定時器
if status in [SessionStatus.ACTIVE, SessionStatus.FEEDBACK_SUBMITTED]:
self._schedule_auto_cleanup()
debug_log(f"會話 {self.session_id} 狀態更新: {status.value} - {self.status_message}")
def get_status_info(self) -> dict:
@ -90,6 +133,117 @@ class WebFeedbackSession:
"""檢查會話是否活躍"""
return self.status in [SessionStatus.WAITING, SessionStatus.ACTIVE, SessionStatus.FEEDBACK_SUBMITTED]
def is_expired(self) -> bool:
"""檢查會話是否已過期"""
# 統一使用 time.time()
current_time = time.time()
# 檢查是否超過最大空閒時間
idle_time = current_time - self.last_activity
if idle_time > self.max_idle_time:
debug_log(f"會話 {self.session_id} 空閒時間過長: {idle_time:.1f}秒 > {self.max_idle_time}")
return True
# 檢查是否處於已過期狀態
if self.status == SessionStatus.EXPIRED:
return True
# 檢查是否處於錯誤或超時狀態且超過一定時間
if self.status in [SessionStatus.ERROR, SessionStatus.TIMEOUT]:
error_time = current_time - self.last_activity
if error_time > 300: # 錯誤狀態超過5分鐘視為過期
debug_log(f"會話 {self.session_id} 錯誤狀態時間過長: {error_time:.1f}")
return True
return False
def get_age(self) -> float:
"""獲取會話年齡(秒)"""
current_time = time.time()
return current_time - self.created_at
def get_idle_time(self) -> float:
"""獲取會話空閒時間(秒)"""
current_time = time.time()
return current_time - self.last_activity
def _schedule_auto_cleanup(self):
"""安排自動清理定時器"""
if self.cleanup_timer:
self.cleanup_timer.cancel()
def auto_cleanup():
"""自動清理回調"""
try:
if not self._cleanup_done and self.is_expired():
debug_log(f"會話 {self.session_id} 觸發自動清理(過期)")
# 使用異步方式執行清理
import asyncio
try:
loop = asyncio.get_event_loop()
loop.create_task(self._cleanup_resources_enhanced(CleanupReason.EXPIRED))
except RuntimeError:
# 如果沒有事件循環,使用同步清理
self._cleanup_sync_enhanced(CleanupReason.EXPIRED)
else:
# 如果還沒過期,重新安排定時器
self._schedule_auto_cleanup()
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"session_id": self.session_id, "operation": "自動清理"},
error_type=ErrorType.SYSTEM
)
debug_log(f"自動清理失敗 [錯誤ID: {error_id}]: {e}")
self.cleanup_timer = threading.Timer(self.auto_cleanup_delay, auto_cleanup)
self.cleanup_timer.daemon = True
self.cleanup_timer.start()
debug_log(f"會話 {self.session_id} 自動清理定時器已設置,{self.auto_cleanup_delay}秒後觸發")
def extend_cleanup_timer(self, additional_time: int = None):
"""延長清理定時器"""
if additional_time is None:
additional_time = self.auto_cleanup_delay
if self.cleanup_timer:
self.cleanup_timer.cancel()
self.cleanup_timer = threading.Timer(additional_time, lambda: None)
self.cleanup_timer.daemon = True
self.cleanup_timer.start()
debug_log(f"會話 {self.session_id} 清理定時器已延長 {additional_time}")
def add_cleanup_callback(self, callback: Callable):
"""添加清理回調函數"""
if callback not in self.cleanup_callbacks:
self.cleanup_callbacks.append(callback)
debug_log(f"會話 {self.session_id} 添加清理回調函數")
def remove_cleanup_callback(self, callback: Callable):
"""移除清理回調函數"""
if callback in self.cleanup_callbacks:
self.cleanup_callbacks.remove(callback)
debug_log(f"會話 {self.session_id} 移除清理回調函數")
def get_cleanup_stats(self) -> dict:
"""獲取清理統計信息"""
stats = self.cleanup_stats.copy()
stats.update({
"session_id": self.session_id,
"age": self.get_age(),
"idle_time": self.get_idle_time(),
"is_expired": self.is_expired(),
"is_active": self.is_active(),
"status": self.status.value,
"has_websocket": self.websocket is not None,
"has_process": self.process is not None,
"command_logs_count": len(self.command_logs),
"images_count": len(self.images)
})
return stats
async def wait_for_feedback(self, timeout: int = 600) -> dict:
"""
等待用戶回饋包含圖片支援超時自動清理
@ -321,33 +475,72 @@ class WebFeedbackSession:
pass
async def _cleanup_resources_on_timeout(self):
"""超時時清理所有資源"""
"""超時時清理所有資源(保持向後兼容)"""
await self._cleanup_resources_enhanced(CleanupReason.TIMEOUT)
async def _cleanup_resources_enhanced(self, reason: CleanupReason):
"""增強的資源清理方法"""
if self._cleanup_done:
return # 避免重複清理
cleanup_start_time = time.time()
self._cleanup_done = True
debug_log(f"開始清理會話 {self.session_id} 的資源...")
debug_log(f"開始清理會話 {self.session_id} 的資源,原因: {reason.value}")
# 更新清理統計
self.cleanup_stats["cleanup_count"] += 1
self.cleanup_stats["cleanup_reason"] = reason.value
self.cleanup_stats["last_cleanup_time"] = datetime.now().isoformat()
resources_cleaned = 0
memory_before = 0
try:
# 1. 關閉 WebSocket 連接
# 記錄清理前的內存使用(如果可能)
try:
import psutil
process = psutil.Process()
memory_before = process.memory_info().rss
except:
pass
# 1. 取消自動清理定時器
if self.cleanup_timer:
self.cleanup_timer.cancel()
self.cleanup_timer = None
resources_cleaned += 1
# 2. 關閉 WebSocket 連接
if self.websocket:
try:
# 先通知前端超時
# 根據清理原因發送不同的通知消息
message_map = {
CleanupReason.TIMEOUT: "會話已超時,介面將自動關閉",
CleanupReason.EXPIRED: "會話已過期,介面將自動關閉",
CleanupReason.MEMORY_PRESSURE: "系統內存不足,會話將被清理",
CleanupReason.MANUAL: "會話已被手動清理",
CleanupReason.ERROR: "會話發生錯誤,將被清理",
CleanupReason.SHUTDOWN: "系統正在關閉,會話將被清理"
}
await self.websocket.send_json({
"type": "session_timeout",
"message": "會話已超時,介面將自動關閉"
"type": "session_cleanup",
"reason": reason.value,
"message": message_map.get(reason, "會話將被清理")
})
await asyncio.sleep(0.1) # 給前端一點時間處理消息
# 安全關閉 WebSocket
await self._safe_close_websocket()
debug_log(f"會話 {self.session_id} WebSocket 已關閉")
resources_cleaned += 1
except Exception as e:
debug_log(f"關閉 WebSocket 時發生錯誤: {e}")
finally:
self.websocket = None
# 2. 終止正在運行的命令進程
# 3. 終止正在運行的命令進程
if self.process:
try:
self.process.terminate()
@ -357,67 +550,213 @@ class WebFeedbackSession:
except subprocess.TimeoutExpired:
self.process.kill()
debug_log(f"會話 {self.session_id} 命令進程已強制終止")
resources_cleaned += 1
except Exception as e:
debug_log(f"終止命令進程時發生錯誤: {e}")
finally:
self.process = None
# 3. 設置完成事件(防止其他地方還在等待)
# 4. 設置完成事件(防止其他地方還在等待)
self.feedback_completed.set()
# 4. 清理臨時數據
# 5. 清理臨時數據
logs_count = len(self.command_logs)
images_count = len(self.images)
self.command_logs.clear()
self.images.clear()
self.settings.clear()
debug_log(f"會話 {self.session_id} 資源清理完成")
if logs_count > 0 or images_count > 0:
resources_cleaned += logs_count + images_count
debug_log(f"清理了 {logs_count} 條日誌和 {images_count} 張圖片")
# 6. 更新會話狀態
if reason == CleanupReason.EXPIRED:
self.status = SessionStatus.EXPIRED
elif reason == CleanupReason.TIMEOUT:
self.status = SessionStatus.TIMEOUT
elif reason == CleanupReason.ERROR:
self.status = SessionStatus.ERROR
else:
self.status = SessionStatus.COMPLETED
# 7. 調用清理回調函數
for callback in self.cleanup_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(self, reason)
else:
callback(self, reason)
except Exception as e:
debug_log(f"清理回調執行失敗: {e}")
# 8. 計算清理效果
cleanup_duration = time.time() - cleanup_start_time
memory_after = 0
try:
import psutil
process = psutil.Process()
memory_after = process.memory_info().rss
except:
pass
memory_freed = max(0, memory_before - memory_after)
# 更新清理統計
self.cleanup_stats.update({
"cleanup_duration": cleanup_duration,
"memory_freed": memory_freed,
"resources_cleaned": resources_cleaned
})
debug_log(f"會話 {self.session_id} 資源清理完成,耗時: {cleanup_duration:.2f}秒,"
f"清理資源: {resources_cleaned}個,釋放內存: {memory_freed}字節")
except Exception as e:
debug_log(f"清理會話 {self.session_id} 資源時發生錯誤: {e}")
error_id = ErrorHandler.log_error_with_context(
e,
context={
"session_id": self.session_id,
"cleanup_reason": reason.value,
"operation": "增強資源清理"
},
error_type=ErrorType.SYSTEM
)
debug_log(f"清理會話 {self.session_id} 資源時發生錯誤 [錯誤ID: {error_id}]: {e}")
# 即使發生錯誤也要更新統計
self.cleanup_stats["cleanup_duration"] = time.time() - cleanup_start_time
def _cleanup_sync(self):
"""同步清理會話資源(但保留 WebSocket 連接)"""
if self._cleanup_done:
"""同步清理會話資源(但保留 WebSocket 連接)- 保持向後兼容"""
self._cleanup_sync_enhanced(CleanupReason.MANUAL, preserve_websocket=True)
def _cleanup_sync_enhanced(self, reason: CleanupReason, preserve_websocket: bool = False):
"""增強的同步清理會話資源"""
if self._cleanup_done and not preserve_websocket:
return
debug_log(f"同步清理會話 {self.session_id} 資源(保留 WebSocket...")
cleanup_start_time = time.time()
debug_log(f"同步清理會話 {self.session_id} 資源,原因: {reason.value}保留WebSocket: {preserve_websocket}")
# 只清理進程,不清理 WebSocket 連接
# 更新清理統計
self.cleanup_stats["cleanup_count"] += 1
self.cleanup_stats["cleanup_reason"] = reason.value
self.cleanup_stats["last_cleanup_time"] = datetime.now().isoformat()
resources_cleaned = 0
memory_before = 0
try:
# 記錄清理前的內存使用
try:
import psutil
process = psutil.Process()
memory_before = process.memory_info().rss
except:
pass
# 1. 取消自動清理定時器
if self.cleanup_timer:
self.cleanup_timer.cancel()
self.cleanup_timer = None
resources_cleaned += 1
# 2. 清理進程
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
debug_log(f"會話 {self.session_id} 命令進程已正常終止")
resources_cleaned += 1
except:
try:
self.process.kill()
debug_log(f"會話 {self.session_id} 命令進程已強制終止")
resources_cleaned += 1
except:
pass
self.process = None
# 清理臨時數據
# 3. 清理臨時數據
logs_count = len(self.command_logs)
images_count = len(self.images)
self.command_logs.clear()
# 注意:不設置 _cleanup_done = True因為還需要清理 WebSocket
if not preserve_websocket:
self.images.clear()
self.settings.clear()
resources_cleaned += images_count
resources_cleaned += logs_count
# 4. 設置完成事件
if not preserve_websocket:
self.feedback_completed.set()
# 5. 更新狀態
if not preserve_websocket:
if reason == CleanupReason.EXPIRED:
self.status = SessionStatus.EXPIRED
elif reason == CleanupReason.TIMEOUT:
self.status = SessionStatus.TIMEOUT
elif reason == CleanupReason.ERROR:
self.status = SessionStatus.ERROR
else:
self.status = SessionStatus.COMPLETED
self._cleanup_done = True
# 6. 調用清理回調函數(同步版本)
for callback in self.cleanup_callbacks:
try:
if not asyncio.iscoroutinefunction(callback):
callback(self, reason)
except Exception as e:
debug_log(f"同步清理回調執行失敗: {e}")
# 7. 計算清理效果
cleanup_duration = time.time() - cleanup_start_time
memory_after = 0
try:
import psutil
process = psutil.Process()
memory_after = process.memory_info().rss
except:
pass
memory_freed = max(0, memory_before - memory_after)
# 更新清理統計
self.cleanup_stats.update({
"cleanup_duration": cleanup_duration,
"memory_freed": memory_freed,
"resources_cleaned": resources_cleaned
})
debug_log(f"會話 {self.session_id} 同步清理完成,耗時: {cleanup_duration:.2f}秒,"
f"清理資源: {resources_cleaned}個,釋放內存: {memory_freed}字節")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={
"session_id": self.session_id,
"cleanup_reason": reason.value,
"preserve_websocket": preserve_websocket,
"operation": "同步資源清理"
},
error_type=ErrorType.SYSTEM
)
debug_log(f"同步清理會話 {self.session_id} 資源時發生錯誤 [錯誤ID: {error_id}]: {e}")
# 即使發生錯誤也要更新統計
self.cleanup_stats["cleanup_duration"] = time.time() - cleanup_start_time
def cleanup(self):
"""同步清理會話資源(保持向後兼容)"""
if self._cleanup_done:
return
self._cleanup_done = True
debug_log(f"同步清理會話 {self.session_id} 資源...")
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
try:
self.process.kill()
except:
pass
self.process = None
# 設置完成事件
self.feedback_completed.set()
self._cleanup_sync_enhanced(CleanupReason.MANUAL)
async def _safe_close_websocket(self):
"""安全關閉 WebSocket 連接,避免事件循環衝突"""

View File

@ -0,0 +1,504 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
會話清理管理器
==============
統一管理 Web 會話的清理策略統計和性能監控
與內存監控系統深度集成提供智能清理決策
"""
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from enum import Enum
from ...debug import web_debug_log as debug_log
from ...utils.error_handler import ErrorHandler, ErrorType
from ..models.feedback_session import CleanupReason, SessionStatus
@dataclass
class CleanupPolicy:
"""清理策略配置"""
max_idle_time: int = 1800 # 最大空閒時間(秒)
max_session_age: int = 7200 # 最大會話年齡(秒)
max_sessions: int = 10 # 最大會話數量
cleanup_interval: int = 300 # 清理間隔(秒)
memory_pressure_threshold: float = 0.8 # 內存壓力閾值
enable_auto_cleanup: bool = True # 啟用自動清理
preserve_active_session: bool = True # 保護活躍會話
@dataclass
class CleanupStats:
"""清理統計數據"""
total_cleanups: int = 0
expired_cleanups: int = 0
memory_pressure_cleanups: int = 0
manual_cleanups: int = 0
auto_cleanups: int = 0
total_sessions_cleaned: int = 0
total_cleanup_time: float = 0.0
average_cleanup_time: float = 0.0
last_cleanup_time: Optional[datetime] = None
cleanup_efficiency: float = 0.0 # 清理效率(清理的會話數/總會話數)
class CleanupTrigger(Enum):
"""清理觸發器類型"""
AUTO = "auto" # 自動清理
MEMORY_PRESSURE = "memory_pressure" # 內存壓力
MANUAL = "manual" # 手動清理
EXPIRED = "expired" # 過期清理
CAPACITY = "capacity" # 容量限制
class SessionCleanupManager:
"""會話清理管理器"""
def __init__(self, web_ui_manager, policy: CleanupPolicy = None):
"""
初始化會話清理管理器
Args:
web_ui_manager: WebUIManager 實例
policy: 清理策略配置
"""
self.web_ui_manager = web_ui_manager
self.policy = policy or CleanupPolicy()
self.stats = CleanupStats()
# 清理狀態
self.is_running = False
self.cleanup_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# 回調函數
self.cleanup_callbacks: List[Callable] = []
self.stats_callbacks: List[Callable] = []
# 清理歷史記錄
self.cleanup_history: List[Dict[str, Any]] = []
self.max_history = 100
debug_log("SessionCleanupManager 初始化完成")
def start_auto_cleanup(self) -> bool:
"""啟動自動清理"""
if not self.policy.enable_auto_cleanup:
debug_log("自動清理已禁用")
return False
if self.is_running:
debug_log("自動清理已在運行")
return True
try:
self.is_running = True
self._stop_event.clear()
self.cleanup_thread = threading.Thread(
target=self._auto_cleanup_loop,
name="SessionCleanupManager",
daemon=True
)
self.cleanup_thread.start()
debug_log(f"自動清理已啟動,間隔 {self.policy.cleanup_interval}")
return True
except Exception as e:
self.is_running = False
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "啟動自動清理"},
error_type=ErrorType.SYSTEM
)
debug_log(f"啟動自動清理失敗 [錯誤ID: {error_id}]: {e}")
return False
def stop_auto_cleanup(self) -> bool:
"""停止自動清理"""
if not self.is_running:
debug_log("自動清理未在運行")
return True
try:
self.is_running = False
self._stop_event.set()
if self.cleanup_thread and self.cleanup_thread.is_alive():
self.cleanup_thread.join(timeout=5)
debug_log("自動清理已停止")
return True
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "停止自動清理"},
error_type=ErrorType.SYSTEM
)
debug_log(f"停止自動清理失敗 [錯誤ID: {error_id}]: {e}")
return False
def _auto_cleanup_loop(self):
"""自動清理主循環"""
debug_log("自動清理循環開始")
while not self._stop_event.is_set():
try:
# 執行清理檢查
self._perform_auto_cleanup()
# 等待下次清理
if self._stop_event.wait(self.policy.cleanup_interval):
break
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "自動清理循環"},
error_type=ErrorType.SYSTEM
)
debug_log(f"自動清理循環錯誤 [錯誤ID: {error_id}]: {e}")
# 發生錯誤時等待較短時間後重試
if self._stop_event.wait(30):
break
debug_log("自動清理循環結束")
def _perform_auto_cleanup(self):
"""執行自動清理"""
cleanup_start_time = time.time()
cleaned_sessions = 0
try:
# 1. 檢查會話數量限制
if len(self.web_ui_manager.sessions) > self.policy.max_sessions:
cleaned = self._cleanup_by_capacity()
cleaned_sessions += cleaned
debug_log(f"容量限制清理了 {cleaned} 個會話")
# 2. 清理過期會話
cleaned = self._cleanup_expired_sessions()
cleaned_sessions += cleaned
# 3. 清理空閒會話
cleaned = self._cleanup_idle_sessions()
cleaned_sessions += cleaned
# 4. 更新統計
cleanup_duration = time.time() - cleanup_start_time
self._update_cleanup_stats(
CleanupTrigger.AUTO,
cleaned_sessions,
cleanup_duration
)
if cleaned_sessions > 0:
debug_log(f"自動清理完成,清理了 {cleaned_sessions} 個會話,耗時: {cleanup_duration:.2f}")
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "執行自動清理"},
error_type=ErrorType.SYSTEM
)
debug_log(f"執行自動清理失敗 [錯誤ID: {error_id}]: {e}")
def trigger_cleanup(self, trigger: CleanupTrigger, force: bool = False) -> int:
"""觸發清理操作"""
cleanup_start_time = time.time()
cleaned_sessions = 0
try:
debug_log(f"觸發清理操作,觸發器: {trigger.value},強制: {force}")
if trigger == CleanupTrigger.MEMORY_PRESSURE:
cleaned_sessions = self.web_ui_manager.cleanup_sessions_by_memory_pressure(force)
elif trigger == CleanupTrigger.EXPIRED:
cleaned_sessions = self.web_ui_manager.cleanup_expired_sessions()
elif trigger == CleanupTrigger.CAPACITY:
cleaned_sessions = self._cleanup_by_capacity()
elif trigger == CleanupTrigger.MANUAL:
# 手動清理:組合多種策略
cleaned_sessions += self.web_ui_manager.cleanup_expired_sessions()
if force:
cleaned_sessions += self.web_ui_manager.cleanup_sessions_by_memory_pressure(force)
else:
# 自動清理
self._perform_auto_cleanup()
return 0 # 統計已在 _perform_auto_cleanup 中更新
# 更新統計
cleanup_duration = time.time() - cleanup_start_time
self._update_cleanup_stats(trigger, cleaned_sessions, cleanup_duration)
debug_log(f"清理操作完成,清理了 {cleaned_sessions} 個會話,耗時: {cleanup_duration:.2f}")
return cleaned_sessions
except Exception as e:
error_id = ErrorHandler.log_error_with_context(
e,
context={"operation": "觸發清理", "trigger": trigger.value, "force": force},
error_type=ErrorType.SYSTEM
)
debug_log(f"觸發清理操作失敗 [錯誤ID: {error_id}]: {e}")
return 0
def _cleanup_by_capacity(self) -> int:
"""根據容量限制清理會話"""
sessions = self.web_ui_manager.sessions
if len(sessions) <= self.policy.max_sessions:
return 0
# 計算需要清理的會話數量
excess_count = len(sessions) - self.policy.max_sessions
# 按優先級排序會話(優先清理舊的、非活躍的會話)
session_priorities = []
for session_id, session in sessions.items():
# 跳過當前活躍會話(如果啟用保護)
if (self.policy.preserve_active_session and
self.web_ui_manager.current_session and
session.session_id == self.web_ui_manager.current_session.session_id):
continue
# 計算優先級分數(分數越高越優先清理)
priority_score = 0
# 狀態優先級
if session.status in [SessionStatus.COMPLETED, SessionStatus.ERROR, SessionStatus.TIMEOUT]:
priority_score += 100
elif session.status == SessionStatus.FEEDBACK_SUBMITTED:
priority_score += 50
# 年齡優先級
age = session.get_age()
priority_score += age / 60 # 每分鐘加1分
# 空閒時間優先級
idle_time = session.get_idle_time()
priority_score += idle_time / 30 # 每30秒加1分
session_priorities.append((session_id, session, priority_score))
# 按優先級排序並清理
session_priorities.sort(key=lambda x: x[2], reverse=True)
cleaned_count = 0
for i in range(min(excess_count, len(session_priorities))):
session_id, session, _ = session_priorities[i]
try:
session._cleanup_sync_enhanced(CleanupReason.MANUAL)
del self.web_ui_manager.sessions[session_id]
cleaned_count += 1
except Exception as e:
debug_log(f"容量清理會話 {session_id} 失敗: {e}")
return cleaned_count
def _cleanup_expired_sessions(self) -> int:
"""清理過期會話"""
expired_sessions = []
current_time = time.time()
for session_id, session in self.web_ui_manager.sessions.items():
# 檢查是否過期
if session.is_expired():
expired_sessions.append(session_id)
# 檢查是否超過最大年齡
elif session.get_age() > self.policy.max_session_age:
expired_sessions.append(session_id)
# 清理過期會話
cleaned_count = 0
for session_id in expired_sessions:
try:
session = self.web_ui_manager.sessions.get(session_id)
if session:
session._cleanup_sync_enhanced(CleanupReason.EXPIRED)
del self.web_ui_manager.sessions[session_id]
cleaned_count += 1
# 如果清理的是當前活躍會話,清空當前會話
if (self.web_ui_manager.current_session and
self.web_ui_manager.current_session.session_id == session_id):
self.web_ui_manager.current_session = None
except Exception as e:
debug_log(f"清理過期會話 {session_id} 失敗: {e}")
return cleaned_count
def _cleanup_idle_sessions(self) -> int:
"""清理空閒會話"""
idle_sessions = []
for session_id, session in self.web_ui_manager.sessions.items():
# 跳過當前活躍會話(如果啟用保護)
if (self.policy.preserve_active_session and
self.web_ui_manager.current_session and
session.session_id == self.web_ui_manager.current_session.session_id):
continue
# 檢查是否空閒時間過長
if session.get_idle_time() > self.policy.max_idle_time:
idle_sessions.append(session_id)
# 清理空閒會話
cleaned_count = 0
for session_id in idle_sessions:
try:
session = self.web_ui_manager.sessions.get(session_id)
if session:
session._cleanup_sync_enhanced(CleanupReason.EXPIRED)
del self.web_ui_manager.sessions[session_id]
cleaned_count += 1
except Exception as e:
debug_log(f"清理空閒會話 {session_id} 失敗: {e}")
return cleaned_count
def _update_cleanup_stats(self, trigger: CleanupTrigger, cleaned_count: int, duration: float):
"""更新清理統計"""
self.stats.total_cleanups += 1
self.stats.total_sessions_cleaned += cleaned_count
self.stats.total_cleanup_time += duration
self.stats.last_cleanup_time = datetime.now()
# 更新平均清理時間
if self.stats.total_cleanups > 0:
self.stats.average_cleanup_time = self.stats.total_cleanup_time / self.stats.total_cleanups
# 更新清理效率
total_sessions = len(self.web_ui_manager.sessions) + cleaned_count
if total_sessions > 0:
self.stats.cleanup_efficiency = cleaned_count / total_sessions
# 根據觸發器類型更新統計
if trigger == CleanupTrigger.AUTO:
self.stats.auto_cleanups += 1
elif trigger == CleanupTrigger.MEMORY_PRESSURE:
self.stats.memory_pressure_cleanups += 1
elif trigger == CleanupTrigger.EXPIRED:
self.stats.expired_cleanups += 1
elif trigger == CleanupTrigger.MANUAL:
self.stats.manual_cleanups += 1
# 記錄清理歷史
cleanup_record = {
"timestamp": datetime.now().isoformat(),
"trigger": trigger.value,
"cleaned_count": cleaned_count,
"duration": duration,
"total_sessions_before": total_sessions,
"total_sessions_after": len(self.web_ui_manager.sessions)
}
self.cleanup_history.append(cleanup_record)
# 限制歷史記錄數量
if len(self.cleanup_history) > self.max_history:
self.cleanup_history = self.cleanup_history[-self.max_history:]
# 調用統計回調
for callback in self.stats_callbacks:
try:
callback(self.stats, cleanup_record)
except Exception as e:
debug_log(f"統計回調執行失敗: {e}")
def get_cleanup_statistics(self) -> Dict[str, Any]:
"""獲取清理統計數據"""
stats_dict = {
"total_cleanups": self.stats.total_cleanups,
"expired_cleanups": self.stats.expired_cleanups,
"memory_pressure_cleanups": self.stats.memory_pressure_cleanups,
"manual_cleanups": self.stats.manual_cleanups,
"auto_cleanups": self.stats.auto_cleanups,
"total_sessions_cleaned": self.stats.total_sessions_cleaned,
"total_cleanup_time": round(self.stats.total_cleanup_time, 2),
"average_cleanup_time": round(self.stats.average_cleanup_time, 2),
"cleanup_efficiency": round(self.stats.cleanup_efficiency, 3),
"last_cleanup_time": self.stats.last_cleanup_time.isoformat() if self.stats.last_cleanup_time else None,
"is_auto_cleanup_running": self.is_running,
"current_sessions": len(self.web_ui_manager.sessions),
"policy": {
"max_idle_time": self.policy.max_idle_time,
"max_session_age": self.policy.max_session_age,
"max_sessions": self.policy.max_sessions,
"cleanup_interval": self.policy.cleanup_interval,
"enable_auto_cleanup": self.policy.enable_auto_cleanup,
"preserve_active_session": self.policy.preserve_active_session
}
}
return stats_dict
def get_cleanup_history(self, limit: int = 20) -> List[Dict[str, Any]]:
"""獲取清理歷史記錄"""
return self.cleanup_history[-limit:] if self.cleanup_history else []
def add_cleanup_callback(self, callback: Callable):
"""添加清理回調函數"""
if callback not in self.cleanup_callbacks:
self.cleanup_callbacks.append(callback)
debug_log("添加清理回調函數")
def add_stats_callback(self, callback: Callable):
"""添加統計回調函數"""
if callback not in self.stats_callbacks:
self.stats_callbacks.append(callback)
debug_log("添加統計回調函數")
def update_policy(self, **kwargs):
"""更新清理策略"""
for key, value in kwargs.items():
if hasattr(self.policy, key):
setattr(self.policy, key, value)
debug_log(f"更新清理策略 {key} = {value}")
else:
debug_log(f"未知的策略參數: {key}")
def reset_stats(self):
"""重置統計數據"""
self.stats = CleanupStats()
self.cleanup_history.clear()
debug_log("清理統計數據已重置")
def force_cleanup_all(self, exclude_current: bool = True) -> int:
"""強制清理所有會話"""
sessions_to_clean = []
for session_id, session in self.web_ui_manager.sessions.items():
# 是否排除當前活躍會話
if (exclude_current and
self.web_ui_manager.current_session and
session.session_id == self.web_ui_manager.current_session.session_id):
continue
sessions_to_clean.append(session_id)
# 清理會話
cleaned_count = 0
for session_id in sessions_to_clean:
try:
session = self.web_ui_manager.sessions.get(session_id)
if session:
session._cleanup_sync_enhanced(CleanupReason.MANUAL)
del self.web_ui_manager.sessions[session_id]
cleaned_count += 1
except Exception as e:
debug_log(f"強制清理會話 {session_id} 失敗: {e}")
# 更新統計
self._update_cleanup_stats(CleanupTrigger.MANUAL, cleaned_count, 0.0)
debug_log(f"強制清理完成,清理了 {cleaned_count} 個會話")
return cleaned_count

View File

@ -0,0 +1,366 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
會話清理優化測試
================
測試 WebFeedbackSession SessionCleanupManager 的清理功能
"""
import asyncio
import pytest
import time
import threading
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.mcp_feedback_enhanced.web.models.feedback_session import (
WebFeedbackSession, SessionStatus, CleanupReason
)
from src.mcp_feedback_enhanced.web.utils.session_cleanup_manager import (
SessionCleanupManager, CleanupPolicy, CleanupTrigger
)
class TestWebFeedbackSessionCleanup:
"""測試 WebFeedbackSession 清理功能"""
def setup_method(self):
"""測試前設置"""
self.session_id = "test_session_001"
self.project_dir = "/tmp/test_project"
self.summary = "測試會話摘要"
# 創建測試會話
self.session = WebFeedbackSession(
self.session_id,
self.project_dir,
self.summary,
auto_cleanup_delay=60, # 1分鐘自動清理
max_idle_time=30 # 30秒最大空閒時間
)
def teardown_method(self):
"""測試後清理"""
if hasattr(self, 'session') and self.session:
try:
self.session._cleanup_sync_enhanced(CleanupReason.MANUAL)
except:
pass
def test_session_initialization(self):
"""測試會話初始化"""
assert self.session.session_id == self.session_id
assert self.session.project_directory == self.project_dir
assert self.session.summary == self.summary
assert self.session.status == SessionStatus.WAITING
assert self.session.auto_cleanup_delay == 60
assert self.session.max_idle_time == 30
assert self.session.cleanup_timer is not None
assert len(self.session.cleanup_stats) > 0
def test_is_expired_by_idle_time(self):
"""測試空閒時間過期檢測"""
# 新創建的會話不應該過期
assert not self.session.is_expired()
# 模擬空閒時間過長
self.session.last_activity = time.time() - 40 # 40秒前
assert self.session.is_expired()
def test_is_expired_by_status(self):
"""測試狀態過期檢測"""
# 設置為錯誤狀態
self.session.status = SessionStatus.ERROR
self.session.last_activity = time.time() - 400 # 400秒前
assert self.session.is_expired()
# 設置為已過期狀態
self.session.status = SessionStatus.EXPIRED
assert self.session.is_expired()
def test_get_age_and_idle_time(self):
"""測試年齡和空閒時間計算"""
# 測試年齡
age = self.session.get_age()
assert age >= 0
assert age < 1 # 剛創建應該小於1秒
# 測試空閒時間
idle_time = self.session.get_idle_time()
assert idle_time >= 0
assert idle_time < 1 # 剛創建應該小於1秒
def test_cleanup_timer_scheduling(self):
"""測試清理定時器調度"""
# 檢查定時器是否已設置
assert self.session.cleanup_timer is not None
assert self.session.cleanup_timer.is_alive()
# 測試延長定時器
old_timer = self.session.cleanup_timer
self.session.extend_cleanup_timer(120)
# 應該創建新的定時器
assert self.session.cleanup_timer != old_timer
assert self.session.cleanup_timer.is_alive()
def test_cleanup_callbacks(self):
"""測試清理回調函數"""
callback_called = False
callback_session = None
callback_reason = None
def test_callback(session, reason):
nonlocal callback_called, callback_session, callback_reason
callback_called = True
callback_session = session
callback_reason = reason
# 添加回調
self.session.add_cleanup_callback(test_callback)
assert len(self.session.cleanup_callbacks) == 1
# 執行清理
self.session._cleanup_sync_enhanced(CleanupReason.MANUAL)
# 檢查回調是否被調用
assert callback_called
assert callback_session == self.session
assert callback_reason == CleanupReason.MANUAL
# 移除回調
self.session.remove_cleanup_callback(test_callback)
assert len(self.session.cleanup_callbacks) == 0
def test_cleanup_stats(self):
"""測試清理統計"""
# 初始統計
stats = self.session.get_cleanup_stats()
assert stats["cleanup_count"] == 0
assert stats["session_id"] == self.session_id
assert stats["is_active"] == True
# 執行清理
self.session._cleanup_sync_enhanced(CleanupReason.EXPIRED)
# 檢查統計更新
stats = self.session.get_cleanup_stats()
assert stats["cleanup_count"] == 1
assert stats["cleanup_reason"] == CleanupReason.EXPIRED.value
assert stats["last_cleanup_time"] is not None
assert stats["cleanup_duration"] >= 0
@pytest.mark.asyncio
async def test_async_cleanup(self):
"""測試異步清理"""
# 模擬 WebSocket 連接
mock_websocket = Mock()
mock_websocket.send_json = Mock(return_value=asyncio.Future())
mock_websocket.send_json.return_value.set_result(None)
mock_websocket.close = Mock(return_value=asyncio.Future())
mock_websocket.close.return_value.set_result(None)
mock_websocket.client_state.DISCONNECTED = False
self.session.websocket = mock_websocket
# 執行異步清理
await self.session._cleanup_resources_enhanced(CleanupReason.TIMEOUT)
# 檢查 WebSocket 是否被正確處理
mock_websocket.send_json.assert_called_once()
# 檢查清理統計
stats = self.session.get_cleanup_stats()
assert stats["cleanup_count"] == 1
assert stats["cleanup_reason"] == CleanupReason.TIMEOUT.value
def test_status_update_resets_timer(self):
"""測試狀態更新重置定時器"""
old_timer = self.session.cleanup_timer
# 更新狀態為活躍
self.session.update_status(SessionStatus.ACTIVE, "測試活躍狀態")
# 檢查定時器是否被重置
assert self.session.cleanup_timer != old_timer
assert self.session.cleanup_timer.is_alive()
assert self.session.status == SessionStatus.ACTIVE
class TestSessionCleanupManager:
"""測試 SessionCleanupManager 功能"""
def setup_method(self):
"""測試前設置"""
# 創建模擬的 WebUIManager
self.mock_web_ui_manager = Mock()
self.mock_web_ui_manager.sessions = {}
self.mock_web_ui_manager.current_session = None
self.mock_web_ui_manager.cleanup_expired_sessions = Mock(return_value=0)
self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure = Mock(return_value=0)
# 創建清理策略
self.policy = CleanupPolicy(
max_idle_time=30,
max_session_age=300,
max_sessions=5,
cleanup_interval=10,
enable_auto_cleanup=True
)
# 創建清理管理器
self.cleanup_manager = SessionCleanupManager(
self.mock_web_ui_manager,
self.policy
)
def teardown_method(self):
"""測試後清理"""
if hasattr(self, 'cleanup_manager'):
self.cleanup_manager.stop_auto_cleanup()
def test_cleanup_manager_initialization(self):
"""測試清理管理器初始化"""
assert self.cleanup_manager.web_ui_manager == self.mock_web_ui_manager
assert self.cleanup_manager.policy == self.policy
assert not self.cleanup_manager.is_running
assert self.cleanup_manager.cleanup_thread is None
assert len(self.cleanup_manager.cleanup_callbacks) == 0
assert len(self.cleanup_manager.cleanup_history) == 0
def test_auto_cleanup_start_stop(self):
"""測試自動清理啟動和停止"""
# 啟動自動清理
result = self.cleanup_manager.start_auto_cleanup()
assert result == True
assert self.cleanup_manager.is_running == True
assert self.cleanup_manager.cleanup_thread is not None
assert self.cleanup_manager.cleanup_thread.is_alive()
# 停止自動清理
result = self.cleanup_manager.stop_auto_cleanup()
assert result == True
assert self.cleanup_manager.is_running == False
def test_trigger_cleanup_memory_pressure(self):
"""測試內存壓力清理觸發"""
# 設置模擬返回值
self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.return_value = 3
# 觸發內存壓力清理
cleaned = self.cleanup_manager.trigger_cleanup(CleanupTrigger.MEMORY_PRESSURE, force=True)
# 檢查結果
assert cleaned == 3
self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.assert_called_once_with(True)
# 檢查統計更新
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] == 1
assert stats["memory_pressure_cleanups"] == 1
assert stats["total_sessions_cleaned"] == 3
def test_trigger_cleanup_expired(self):
"""測試過期清理觸發"""
# 設置模擬返回值
self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 2
# 觸發過期清理
cleaned = self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED)
# 檢查結果
assert cleaned == 2
self.mock_web_ui_manager.cleanup_expired_sessions.assert_called_once()
# 檢查統計更新
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] == 1
assert stats["expired_cleanups"] == 1
assert stats["total_sessions_cleaned"] == 2
def test_cleanup_statistics(self):
"""測試清理統計功能"""
# 初始統計
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] == 0
assert stats["total_sessions_cleaned"] == 0
assert stats["is_auto_cleanup_running"] == False
# 執行一些清理操作
self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1
self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED)
self.mock_web_ui_manager.cleanup_sessions_by_memory_pressure.return_value = 2
self.cleanup_manager.trigger_cleanup(CleanupTrigger.MEMORY_PRESSURE)
# 檢查統計
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] == 2
assert stats["expired_cleanups"] == 1
assert stats["memory_pressure_cleanups"] == 1
assert stats["total_sessions_cleaned"] == 3
assert stats["average_cleanup_time"] >= 0
def test_cleanup_history(self):
"""測試清理歷史記錄"""
# 初始歷史為空
history = self.cleanup_manager.get_cleanup_history()
assert len(history) == 0
# 執行清理操作
self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1
self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED)
# 檢查歷史記錄
history = self.cleanup_manager.get_cleanup_history()
assert len(history) == 1
record = history[0]
assert record["trigger"] == CleanupTrigger.EXPIRED.value
assert record["cleaned_count"] == 1
assert "timestamp" in record
assert "duration" in record
def test_policy_update(self):
"""測試策略更新"""
# 更新策略
self.cleanup_manager.update_policy(
max_idle_time=60,
max_sessions=10,
enable_auto_cleanup=False
)
# 檢查策略是否更新
assert self.cleanup_manager.policy.max_idle_time == 60
assert self.cleanup_manager.policy.max_sessions == 10
assert self.cleanup_manager.policy.enable_auto_cleanup == False
def test_stats_reset(self):
"""測試統計重置"""
# 執行一些操作產生統計
self.mock_web_ui_manager.cleanup_expired_sessions.return_value = 1
self.cleanup_manager.trigger_cleanup(CleanupTrigger.EXPIRED)
# 檢查有統計數據
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] > 0
# 重置統計
self.cleanup_manager.reset_stats()
# 檢查統計已重置
stats = self.cleanup_manager.get_cleanup_statistics()
assert stats["total_cleanups"] == 0
assert stats["total_sessions_cleaned"] == 0
history = self.cleanup_manager.get_cleanup_history()
assert len(history) == 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])