新增增強版 MCP 測試系統,包含多場景測試、報告生成及狀態管理功能,並重構 Web UI 以支援單一活躍會話模式。更新相關文檔及樣式,提升使用體驗。

This commit is contained in:
Minidoracat 2025-06-06 16:44:24 +08:00
parent 740c0dbbb8
commit d5494943dd
19 changed files with 5216 additions and 1489 deletions

3
.gitignore vendored
View File

@ -18,4 +18,5 @@ venv*/
.cursor/rules/ .cursor/rules/
uv.lock uv.lock
.mcp_feedback_settings.json .mcp_feedback_settings.json
test_reports/

184
debug_websocket.html Normal file
View File

@ -0,0 +1,184 @@
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket 診斷工具</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background: #1a1a1a;
color: #ffffff;
}
.container {
max-width: 800px;
margin: 0 auto;
}
.status {
padding: 10px;
margin: 10px 0;
border-radius: 5px;
}
.success { background: #2d5a2d; }
.error { background: #5a2d2d; }
.info { background: #2d4a5a; }
.warning { background: #5a5a2d; }
.log {
background: #2a2a2a;
padding: 15px;
border-radius: 5px;
margin: 10px 0;
font-family: monospace;
white-space: pre-wrap;
max-height: 400px;
overflow-y: auto;
}
button {
background: #4a90e2;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
margin: 5px;
}
button:hover {
background: #357abd;
}
input {
padding: 8px;
margin: 5px;
border: 1px solid #555;
background: #333;
color: white;
border-radius: 3px;
}
</style>
</head>
<body>
<div class="container">
<h1>🔧 WebSocket 診斷工具</h1>
<div id="status" class="status info">
準備開始診斷...
</div>
<div>
<label>WebSocket URL:</label>
<input type="text" id="wsUrl" value="ws://127.0.0.1:8767/ws" style="width: 300px;">
<button onclick="testConnection()">🔗 測試連接</button>
<button onclick="clearLog()">🗑️ 清除日誌</button>
</div>
<div>
<label>發送消息:</label>
<input type="text" id="messageInput" placeholder='{"type": "get_status"}' style="width: 300px;">
<button onclick="sendMessage()">📤 發送</button>
</div>
<div id="log" class="log">等待操作...</div>
</div>
<script>
let websocket = null;
let logElement = document.getElementById('log');
let statusElement = document.getElementById('status');
function log(message, type = 'info') {
const timestamp = new Date().toLocaleTimeString();
logElement.textContent += `[${timestamp}] ${message}\n`;
logElement.scrollTop = logElement.scrollHeight;
// 更新狀態
statusElement.textContent = message;
statusElement.className = `status ${type}`;
}
function clearLog() {
logElement.textContent = '';
log('日誌已清除');
}
function testConnection() {
const url = document.getElementById('wsUrl').value;
if (websocket) {
log('關閉現有連接...', 'warning');
websocket.close();
websocket = null;
}
log(`嘗試連接到: ${url}`, 'info');
try {
websocket = new WebSocket(url);
websocket.onopen = function(event) {
log('✅ WebSocket 連接成功!', 'success');
};
websocket.onmessage = function(event) {
log(`📨 收到消息: ${event.data}`, 'success');
try {
const data = JSON.parse(event.data);
log(`📋 解析後的數據: ${JSON.stringify(data, null, 2)}`, 'info');
} catch (e) {
log(`⚠️ JSON 解析失敗: ${e.message}`, 'warning');
}
};
websocket.onclose = function(event) {
log(`🔌 連接已關閉 - Code: ${event.code}, Reason: ${event.reason}`, 'warning');
websocket = null;
};
websocket.onerror = function(error) {
log(`❌ WebSocket 錯誤: ${error}`, 'error');
console.error('WebSocket error:', error);
};
} catch (error) {
log(`❌ 連接失敗: ${error.message}`, 'error');
}
}
function sendMessage() {
const messageInput = document.getElementById('messageInput');
const message = messageInput.value.trim();
if (!message) {
log('⚠️ 請輸入要發送的消息', 'warning');
return;
}
if (!websocket || websocket.readyState !== WebSocket.OPEN) {
log('❌ WebSocket 未連接', 'error');
return;
}
try {
websocket.send(message);
log(`📤 已發送: ${message}`, 'info');
messageInput.value = '';
} catch (error) {
log(`❌ 發送失敗: ${error.message}`, 'error');
}
}
// 頁面加載時自動測試
window.onload = function() {
log('🚀 WebSocket 診斷工具已載入');
log('💡 點擊 "測試連接" 開始診斷');
};
// Enter 鍵發送消息
document.getElementById('messageInput').addEventListener('keydown', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>

View File

@ -26,6 +26,7 @@ dependencies = [
"uvicorn>=0.30.0", "uvicorn>=0.30.0",
"jinja2>=3.1.0", "jinja2>=3.1.0",
"websockets>=13.0.0", "websockets>=13.0.0",
"aiohttp>=3.8.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@ -30,6 +30,12 @@ def main():
test_parser = subparsers.add_parser('test', help='執行測試') test_parser = subparsers.add_parser('test', help='執行測試')
test_parser.add_argument('--web', action='store_true', help='測試 Web UI (自動持續運行)') test_parser.add_argument('--web', action='store_true', help='測試 Web UI (自動持續運行)')
test_parser.add_argument('--gui', action='store_true', help='測試 Qt GUI (快速測試)') test_parser.add_argument('--gui', action='store_true', help='測試 Qt GUI (快速測試)')
test_parser.add_argument('--enhanced', action='store_true', help='執行增強 MCP 測試 (推薦)')
test_parser.add_argument('--scenario', help='運行特定的測試場景')
test_parser.add_argument('--tags', help='根據標籤運行測試場景 (逗號分隔)')
test_parser.add_argument('--list-scenarios', action='store_true', help='列出所有可用的測試場景')
test_parser.add_argument('--report-format', choices=['html', 'json', 'markdown'], help='報告格式')
test_parser.add_argument('--timeout', type=int, help='測試超時時間 (秒)')
# 版本命令 # 版本命令
version_parser = subparsers.add_parser('version', help='顯示版本資訊') version_parser = subparsers.add_parser('version', help='顯示版本資訊')
@ -58,8 +64,54 @@ def run_tests(args):
"""執行測試""" """執行測試"""
# 啟用調試模式以顯示測試過程 # 啟用調試模式以顯示測試過程
os.environ["MCP_DEBUG"] = "true" os.environ["MCP_DEBUG"] = "true"
if args.web: if args.enhanced or args.scenario or args.tags or args.list_scenarios:
# 使用新的增強測試系統
print("🚀 執行增強 MCP 測試系統...")
import asyncio
from .test_mcp_enhanced import MCPTestRunner, TestConfig
# 創建配置
config = TestConfig.from_env()
if args.timeout:
config.test_timeout = args.timeout
if args.report_format:
config.report_format = args.report_format
runner = MCPTestRunner(config)
async def run_enhanced_tests():
try:
if args.list_scenarios:
# 列出測試場景
tags = args.tags.split(',') if args.tags else None
runner.list_scenarios(tags)
return True
success = False
if args.scenario:
# 運行特定場景
success = await runner.run_single_scenario(args.scenario)
elif args.tags:
# 根據標籤運行
tags = [tag.strip() for tag in args.tags.split(',')]
success = await runner.run_scenarios_by_tags(tags)
else:
# 運行所有場景
success = await runner.run_all_scenarios()
return success
except Exception as e:
print(f"❌ 增強測試執行失敗: {e}")
return False
success = asyncio.run(run_enhanced_tests())
if not success:
sys.exit(1)
elif args.web:
print("🧪 執行 Web UI 測試...") print("🧪 執行 Web UI 測試...")
from .test_web_ui import test_web_ui, interactive_demo from .test_web_ui import test_web_ui, interactive_demo
success, session_info = test_web_ui() success, session_info = test_web_ui()
@ -77,39 +129,33 @@ def run_tests(args):
if not test_qt_gui(): if not test_qt_gui():
sys.exit(1) sys.exit(1)
else: else:
# 執行所有測試 # 默認執行增強測試系統的快速測試
print("🧪 執行完整測試套件...") print("🧪 執行快速測試套件 (使用增強測試系統)...")
success = True print("💡 提示:使用 --enhanced 參數可執行完整測試")
session_info = None
import asyncio
try: from .test_mcp_enhanced import MCPTestRunner, TestConfig
from .test_web_ui import (
test_environment_detection, config = TestConfig.from_env()
test_new_parameters, config.test_timeout = 60 # 快速測試使用較短超時
test_mcp_integration,
test_web_ui, runner = MCPTestRunner(config)
interactive_demo
) async def run_quick_tests():
try:
if not test_environment_detection(): # 運行快速測試標籤
success = False success = await runner.run_scenarios_by_tags(["quick"])
if not test_new_parameters(): return success
success = False except Exception as e:
if not test_mcp_integration(): print(f"❌ 快速測試執行失敗: {e}")
success = False return False
web_success, session_info = test_web_ui() success = asyncio.run(run_quick_tests())
if not web_success:
success = False
except Exception as e:
print(f"❌ 測試執行失敗: {e}")
success = False
if not success: if not success:
sys.exit(1) sys.exit(1)
print("🎉 所有測試通過!") print("🎉 快速測試通過!")
print("💡 使用 'test --enhanced' 執行完整測試套件")
def show_version(): def show_version():
"""顯示版本資訊""" """顯示版本資訊"""

View File

@ -0,0 +1,286 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MCP 增強測試系統
================
完整的 MCP 測試框架模擬真實的 Cursor IDE 調用場景
主要功能
- 真實 MCP 調用模擬
- 完整的回饋循環測試
- 多場景測試覆蓋
- 詳細的測試報告
使用方法
python -m mcp_feedback_enhanced.test_mcp_enhanced
python -m mcp_feedback_enhanced.test_mcp_enhanced --scenario basic_workflow
python -m mcp_feedback_enhanced.test_mcp_enhanced --tags quick
"""
import asyncio
import argparse
import sys
import os
from typing import List, Optional
from pathlib import Path
# 添加專案根目錄到 Python 路徑
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from .testing import TestScenarios, TestReporter, TestConfig, DEFAULT_CONFIG
from .debug import debug_log
class MCPTestRunner:
"""MCP 測試運行器"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
self.scenarios = TestScenarios(self.config)
self.reporter = TestReporter(self.config)
async def run_single_scenario(self, scenario_name: str) -> bool:
"""運行單個測試場景"""
debug_log(f"🎯 運行單個測試場景: {scenario_name}")
result = await self.scenarios.run_scenario(scenario_name)
# 生成報告
test_results = {
"success": result.get("success", False),
"total_scenarios": 1,
"passed_scenarios": 1 if result.get("success", False) else 0,
"failed_scenarios": 0 if result.get("success", False) else 1,
"results": [result]
}
report = self.reporter.generate_report(test_results)
self.reporter.print_summary(report)
# 保存報告
if self.config.report_output_dir:
report_path = self.reporter.save_report(report)
debug_log(f"📄 詳細報告已保存: {report_path}")
return result.get("success", False)
async def run_scenarios_by_tags(self, tags: List[str]) -> bool:
"""根據標籤運行測試場景"""
debug_log(f"🏷️ 運行標籤測試: {', '.join(tags)}")
results = await self.scenarios.run_all_scenarios(tags)
# 生成報告
report = self.reporter.generate_report(results)
self.reporter.print_summary(report)
# 保存報告
if self.config.report_output_dir:
report_path = self.reporter.save_report(report)
debug_log(f"📄 詳細報告已保存: {report_path}")
return results.get("success", False)
async def run_all_scenarios(self) -> bool:
"""運行所有測試場景"""
debug_log("🚀 運行所有測試場景")
results = await self.scenarios.run_all_scenarios()
# 生成報告
report = self.reporter.generate_report(results)
self.reporter.print_summary(report)
# 保存報告
if self.config.report_output_dir:
report_path = self.reporter.save_report(report)
debug_log(f"📄 詳細報告已保存: {report_path}")
return results.get("success", False)
def list_scenarios(self, tags: Optional[List[str]] = None):
"""列出可用的測試場景"""
scenarios = self.scenarios.list_scenarios(tags)
print("\n📋 可用的測試場景:")
print("=" * 50)
for scenario in scenarios:
tags_str = f" [{', '.join(scenario.tags)}]" if scenario.tags else ""
print(f"🧪 {scenario.name}{tags_str}")
print(f" {scenario.description}")
print(f" 超時: {scenario.timeout}s")
print()
print(f"總計: {len(scenarios)} 個測試場景")
def create_config_from_args(args) -> TestConfig:
"""從命令行參數創建配置"""
config = TestConfig.from_env()
# 覆蓋命令行參數
if args.timeout:
config.test_timeout = args.timeout
if args.verbose is not None:
config.test_verbose = args.verbose
if args.debug:
config.test_debug = True
os.environ["MCP_DEBUG"] = "true"
if args.report_format:
config.report_format = args.report_format
if args.report_dir:
config.report_output_dir = args.report_dir
if args.project_dir:
config.test_project_dir = args.project_dir
return config
async def main():
"""主函數"""
parser = argparse.ArgumentParser(
description="MCP 增強測試系統",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
%(prog)s # 運行所有測試
%(prog)s --scenario basic_workflow # 運行特定場景
%(prog)s --tags quick # 運行快速測試
%(prog)s --tags basic,integration # 運行多個標籤
%(prog)s --list # 列出所有場景
%(prog)s --debug --verbose # 調試模式
"""
)
# 測試選項
parser.add_argument(
'--scenario',
help='運行特定的測試場景'
)
parser.add_argument(
'--tags',
help='根據標籤運行測試場景 (逗號分隔)'
)
parser.add_argument(
'--list',
action='store_true',
help='列出所有可用的測試場景'
)
# 配置選項
parser.add_argument(
'--timeout',
type=int,
help='測試超時時間 (秒)'
)
parser.add_argument(
'--verbose',
action='store_true',
help='詳細輸出'
)
parser.add_argument(
'--debug',
action='store_true',
help='調試模式'
)
parser.add_argument(
'--project-dir',
help='測試項目目錄'
)
# 報告選項
parser.add_argument(
'--report-format',
choices=['html', 'json', 'markdown'],
help='報告格式'
)
parser.add_argument(
'--report-dir',
help='報告輸出目錄'
)
args = parser.parse_args()
# 創建配置
config = create_config_from_args(args)
# 創建測試運行器
runner = MCPTestRunner(config)
try:
if args.list:
# 列出測試場景
tags = args.tags.split(',') if args.tags else None
runner.list_scenarios(tags)
return
success = False
if args.scenario:
# 運行特定場景
success = await runner.run_single_scenario(args.scenario)
elif args.tags:
# 根據標籤運行
tags = [tag.strip() for tag in args.tags.split(',')]
success = await runner.run_scenarios_by_tags(tags)
else:
# 運行所有場景
success = await runner.run_all_scenarios()
if success:
debug_log("🎉 所有測試通過!")
sys.exit(0)
else:
debug_log("❌ 部分測試失敗")
sys.exit(1)
except KeyboardInterrupt:
debug_log("\n⚠️ 測試被用戶中斷")
sys.exit(130)
except Exception as e:
debug_log(f"❌ 測試執行失敗: {e}")
if config.test_debug:
import traceback
debug_log(f"詳細錯誤: {traceback.format_exc()}")
sys.exit(1)
def run_quick_test():
"""快速測試入口"""
os.environ["MCP_DEBUG"] = "true"
# 設置快速測試配置
config = TestConfig.from_env()
config.test_timeout = 60
config.report_format = "markdown"
async def quick_test():
runner = MCPTestRunner(config)
return await runner.run_scenarios_by_tags(["quick"])
return asyncio.run(quick_test())
def run_basic_workflow_test():
"""基礎工作流程測試入口"""
os.environ["MCP_DEBUG"] = "true"
config = TestConfig.from_env()
config.test_timeout = 180
async def workflow_test():
runner = MCPTestRunner(config)
return await runner.run_single_scenario("basic_workflow")
return asyncio.run(workflow_test())
if __name__ == "__main__":
asyncio.run(main())

View File

@ -140,7 +140,7 @@ def test_web_ui(keep_running=False):
session_info = { session_info = {
'manager': manager, 'manager': manager,
'session_id': session_id, 'session_id': session_id,
'url': f"http://{manager.host}:{manager.port}/session/{session_id}" 'url': f"http://{manager.host}:{manager.port}" # 使用根路徑
} }
debug_log(f"✅ 測試會話創建成功 (ID: {session_id[:8]}...)") debug_log(f"✅ 測試會話創建成功 (ID: {session_id[:8]}...)")
debug_log(f"🔗 測試 URL: {session_info['url']}") debug_log(f"🔗 測試 URL: {session_info['url']}")
@ -299,16 +299,16 @@ def interactive_demo(session_info):
"""Run interactive demo with the Web UI""" """Run interactive demo with the Web UI"""
debug_log(f"\n🌐 Web UI 互動測試模式") debug_log(f"\n🌐 Web UI 互動測試模式")
debug_log("=" * 50) debug_log("=" * 50)
debug_log(f"服務器地址: http://{session_info['manager'].host}:{session_info['manager'].port}") debug_log(f"服務器地址: {session_info['url']}") # 簡化輸出,只顯示服務器地址
debug_log(f"測試會話: {session_info['url']}")
debug_log("\n📖 操作指南:") debug_log("\n📖 操作指南:")
debug_log(" 1. 在瀏覽器中開啟上面的測試 URL") debug_log(" 1. 在瀏覽器中開啟上面的服務器地址")
debug_log(" 2. 嘗試以下功能:") debug_log(" 2. 嘗試以下功能:")
debug_log(" - 點擊 '顯示命令區塊' 按鈕") debug_log(" - 點擊 '顯示命令區塊' 按鈕")
debug_log(" - 輸入命令如 'echo Hello World' 並執行") debug_log(" - 輸入命令如 'echo Hello World' 並執行")
debug_log(" - 在回饋區域輸入文字") debug_log(" - 在回饋區域輸入文字")
debug_log(" - 使用 Ctrl+Enter 提交回饋") debug_log(" - 使用 Ctrl+Enter 提交回饋")
debug_log(" 3. 測試 WebSocket 即時通訊功能") debug_log(" 3. 測試 WebSocket 即時通訊功能")
debug_log(" 4. 測試頁面持久性(提交反饋後頁面不關閉)")
debug_log("\n⌨️ 控制選項:") debug_log("\n⌨️ 控制選項:")
debug_log(" - 按 Enter 繼續運行") debug_log(" - 按 Enter 繼續運行")
debug_log(" - 輸入 'q''quit' 停止服務器") debug_log(" - 輸入 'q''quit' 停止服務器")

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MCP 測試框架
============
完整的 MCP 測試系統模擬真實的 Cursor IDE 調用場景
主要功能
- MCP 客戶端模擬器
- 完整的回饋循環測試
- 多場景測試覆蓋
- 詳細的測試報告
作者: Augment Agent
創建時間: 2025-01-05
"""
from .mcp_client import MCPTestClient
from .scenarios import TestScenarios
from .validators import TestValidators
from .reporter import TestReporter
from .utils import TestUtils
from .config import TestConfig, DEFAULT_CONFIG
__all__ = [
'MCPTestClient',
'TestScenarios',
'TestValidators',
'TestReporter',
'TestUtils',
'TestConfig',
'DEFAULT_CONFIG'
]
__version__ = "1.0.0"
__author__ = "Augment Agent"

View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
測試配置管理
============
管理 MCP 測試框架的配置參數和設定
"""
import os
from dataclasses import dataclass
from typing import Dict, Any, Optional
from pathlib import Path
@dataclass
class TestConfig:
"""測試配置類"""
# 服務器配置
server_host: str = "127.0.0.1"
server_port: int = 8765
server_timeout: int = 30
# MCP 客戶端配置
mcp_timeout: int = 60
mcp_retry_count: int = 3
mcp_retry_delay: float = 1.0
# WebSocket 配置
websocket_timeout: int = 10
websocket_ping_interval: int = 5
websocket_ping_timeout: int = 3
# 測試配置
test_timeout: int = 120
test_parallel: bool = False
test_verbose: bool = True
test_debug: bool = False
# 報告配置
report_format: str = "html" # html, json, markdown
report_output_dir: str = "test_reports"
report_include_logs: bool = True
report_include_performance: bool = True
# 測試數據配置
test_project_dir: Optional[str] = None
test_summary: str = "MCP 測試框架 - 模擬 Cursor IDE 調用"
test_feedback_text: str = "這是一個測試回饋,用於驗證 MCP 系統功能。"
@classmethod
def from_env(cls) -> 'TestConfig':
"""從環境變數創建配置"""
config = cls()
# 從環境變數讀取配置
config.server_host = os.getenv('MCP_TEST_HOST', config.server_host)
config.server_port = int(os.getenv('MCP_TEST_PORT', str(config.server_port)))
config.server_timeout = int(os.getenv('MCP_TEST_SERVER_TIMEOUT', str(config.server_timeout)))
config.mcp_timeout = int(os.getenv('MCP_TEST_TIMEOUT', str(config.mcp_timeout)))
config.mcp_retry_count = int(os.getenv('MCP_TEST_RETRY_COUNT', str(config.mcp_retry_count)))
config.test_timeout = int(os.getenv('MCP_TEST_CASE_TIMEOUT', str(config.test_timeout)))
config.test_parallel = os.getenv('MCP_TEST_PARALLEL', '').lower() in ('true', '1', 'yes')
config.test_verbose = os.getenv('MCP_TEST_VERBOSE', '').lower() not in ('false', '0', 'no')
config.test_debug = os.getenv('MCP_DEBUG', '').lower() in ('true', '1', 'yes')
config.report_format = os.getenv('MCP_TEST_REPORT_FORMAT', config.report_format)
config.report_output_dir = os.getenv('MCP_TEST_REPORT_DIR', config.report_output_dir)
config.test_project_dir = os.getenv('MCP_TEST_PROJECT_DIR', config.test_project_dir)
return config
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TestConfig':
"""從字典創建配置"""
config = cls()
for key, value in data.items():
if hasattr(config, key):
setattr(config, key, value)
return config
def to_dict(self) -> Dict[str, Any]:
"""轉換為字典"""
return {
'server_host': self.server_host,
'server_port': self.server_port,
'server_timeout': self.server_timeout,
'mcp_timeout': self.mcp_timeout,
'mcp_retry_count': self.mcp_retry_count,
'mcp_retry_delay': self.mcp_retry_delay,
'websocket_timeout': self.websocket_timeout,
'websocket_ping_interval': self.websocket_ping_interval,
'websocket_ping_timeout': self.websocket_ping_timeout,
'test_timeout': self.test_timeout,
'test_parallel': self.test_parallel,
'test_verbose': self.test_verbose,
'test_debug': self.test_debug,
'report_format': self.report_format,
'report_output_dir': self.report_output_dir,
'report_include_logs': self.report_include_logs,
'report_include_performance': self.report_include_performance,
'test_project_dir': self.test_project_dir,
'test_summary': self.test_summary,
'test_feedback_text': self.test_feedback_text
}
def get_server_url(self) -> str:
"""獲取服務器 URL"""
return f"http://{self.server_host}:{self.server_port}"
def get_websocket_url(self) -> str:
"""獲取 WebSocket URL"""
return f"ws://{self.server_host}:{self.server_port}/ws"
def get_report_output_path(self) -> Path:
"""獲取報告輸出路徑"""
return Path(self.report_output_dir)
def ensure_report_dir(self) -> Path:
"""確保報告目錄存在"""
report_dir = self.get_report_output_path()
report_dir.mkdir(parents=True, exist_ok=True)
return report_dir
# 默認配置實例
DEFAULT_CONFIG = TestConfig.from_env()

View File

@ -0,0 +1,527 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MCP 客戶端模擬器
================
模擬 Cursor IDE 作為 MCP 客戶端的完整調用流程實現標準的 JSON-RPC 2.0 通信協議
主要功能
- MCP 協議握手和初始化
- 工具發現和能力協商
- 工具調用和結果處理
- 錯誤處理和重連機制
"""
import asyncio
import json
import uuid
import time
import subprocess
import signal
import os
from typing import Dict, Any, Optional, List, Callable, Awaitable
from pathlib import Path
from dataclasses import dataclass, field
from .config import TestConfig, DEFAULT_CONFIG
from .utils import TestUtils, PerformanceMonitor, AsyncEventWaiter
from ..debug import debug_log
@dataclass
class MCPMessage:
"""MCP 消息類"""
jsonrpc: str = "2.0"
id: Optional[str] = None
method: Optional[str] = None
params: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""轉換為字典"""
data = {"jsonrpc": self.jsonrpc}
if self.id is not None:
data["id"] = self.id
if self.method is not None:
data["method"] = self.method
if self.params is not None:
data["params"] = self.params
if self.result is not None:
data["result"] = self.result
if self.error is not None:
data["error"] = self.error
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':
"""從字典創建"""
return cls(
jsonrpc=data.get("jsonrpc", "2.0"),
id=data.get("id"),
method=data.get("method"),
params=data.get("params"),
result=data.get("result"),
error=data.get("error")
)
def is_request(self) -> bool:
"""是否為請求消息"""
return self.method is not None
def is_response(self) -> bool:
"""是否為響應消息"""
return self.result is not None or self.error is not None
def is_notification(self) -> bool:
"""是否為通知消息"""
return self.method is not None and self.id is None
@dataclass
class MCPClientState:
"""MCP 客戶端狀態"""
connected: bool = False
initialized: bool = False
tools_discovered: bool = False
available_tools: List[Dict[str, Any]] = field(default_factory=list)
server_capabilities: Dict[str, Any] = field(default_factory=dict)
client_info: Dict[str, Any] = field(default_factory=dict)
server_info: Dict[str, Any] = field(default_factory=dict)
class MCPTestClient:
"""MCP 測試客戶端"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
self.state = MCPClientState()
self.process: Optional[subprocess.Popen] = None
self.event_waiter = AsyncEventWaiter()
self.performance_monitor = PerformanceMonitor()
self.message_id_counter = 0
self.pending_requests: Dict[str, asyncio.Future] = {}
self.message_handlers: Dict[str, Callable] = {}
# 設置默認消息處理器
self._setup_default_handlers()
def _setup_default_handlers(self):
"""設置默認消息處理器"""
self.message_handlers.update({
'initialize': self._handle_initialize_response,
'tools/list': self._handle_tools_list_response,
'tools/call': self._handle_tools_call_response,
})
def _generate_message_id(self) -> str:
"""生成消息 ID"""
self.message_id_counter += 1
return f"msg_{self.message_id_counter}_{uuid.uuid4().hex[:8]}"
async def start_server(self) -> bool:
"""啟動 MCP 服務器"""
try:
debug_log("🚀 啟動 MCP 服務器...")
self.performance_monitor.start()
# 構建啟動命令
cmd = [
"python", "-m", "src.mcp_feedback_enhanced", "server"
]
# 設置環境變數
env = os.environ.copy()
env.update({
"MCP_DEBUG": "true" if self.config.test_debug else "false",
"PYTHONPATH": str(Path(__file__).parent.parent.parent.parent)
})
# 啟動進程
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
bufsize=0
)
debug_log(f"✅ MCP 服務器進程已啟動 (PID: {self.process.pid})")
# 等待服務器初始化
await asyncio.sleep(2)
# 檢查進程是否仍在運行
if self.process.poll() is not None:
stderr_output = self.process.stderr.read() if self.process.stderr else ""
raise RuntimeError(f"MCP 服務器啟動失敗: {stderr_output}")
self.state.connected = True
self.performance_monitor.checkpoint("server_started")
return True
except Exception as e:
debug_log(f"❌ 啟動 MCP 服務器失敗: {e}")
await self.cleanup()
return False
async def stop_server(self):
"""停止 MCP 服務器"""
if self.process:
try:
debug_log("🛑 停止 MCP 服務器...")
# 嘗試優雅關閉
self.process.terminate()
try:
await asyncio.wait_for(
asyncio.create_task(self._wait_for_process()),
timeout=5.0
)
except asyncio.TimeoutError:
debug_log("⚠️ 優雅關閉超時,強制終止進程")
self.process.kill()
await self._wait_for_process()
debug_log("✅ MCP 服務器已停止")
except Exception as e:
debug_log(f"⚠️ 停止 MCP 服務器時發生錯誤: {e}")
finally:
self.process = None
self.state.connected = False
async def _wait_for_process(self):
"""等待進程結束"""
if self.process:
while self.process.poll() is None:
await asyncio.sleep(0.1)
async def send_message(self, message: MCPMessage) -> Optional[MCPMessage]:
"""發送 MCP 消息"""
if not self.process or not self.state.connected:
raise RuntimeError("MCP 服務器未連接")
try:
# 序列化消息
message_data = json.dumps(message.to_dict()) + "\n"
debug_log(f"📤 發送 MCP 消息: {message.method or 'response'}")
if self.config.test_debug:
debug_log(f" 內容: {message_data.strip()}")
# 發送消息
self.process.stdin.write(message_data)
self.process.stdin.flush()
# 如果是請求,等待響應
if message.is_request() and message.id:
future = asyncio.Future()
self.pending_requests[message.id] = future
try:
response = await asyncio.wait_for(
future,
timeout=self.config.mcp_timeout
)
return response
except asyncio.TimeoutError:
self.pending_requests.pop(message.id, None)
raise TimeoutError(f"MCP 請求超時: {message.method}")
return None
except Exception as e:
debug_log(f"❌ 發送 MCP 消息失敗: {e}")
raise
async def read_messages(self):
"""讀取 MCP 消息"""
if not self.process:
return
try:
while self.process and self.process.poll() is None:
# 讀取一行
line = await asyncio.create_task(self._read_line())
if not line:
continue
try:
# 解析 JSON
data = json.loads(line.strip())
message = MCPMessage.from_dict(data)
debug_log(f"📨 收到 MCP 消息: {message.method or 'response'}")
if self.config.test_debug:
debug_log(f" 內容: {line.strip()}")
# 處理消息
await self._handle_message(message)
except json.JSONDecodeError as e:
debug_log(f"⚠️ JSON 解析失敗: {e}, 原始數據: {line}")
except Exception as e:
debug_log(f"❌ 處理消息失敗: {e}")
except Exception as e:
debug_log(f"❌ 讀取 MCP 消息失敗: {e}")
async def _read_line(self) -> str:
"""異步讀取一行"""
if not self.process or not self.process.stdout:
return ""
# 使用線程池執行阻塞的讀取操作
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.process.stdout.readline)
async def _handle_message(self, message: MCPMessage):
"""處理收到的消息"""
if message.is_response() and message.id:
# 處理響應
future = self.pending_requests.pop(message.id, None)
if future and not future.done():
future.set_result(message)
elif message.is_request():
# 處理請求(通常是服務器發起的)
debug_log(f"收到服務器請求: {message.method}")
# 調用特定的消息處理器
if message.method in self.message_handlers:
await self.message_handlers[message.method](message)
async def _handle_initialize_response(self, message: MCPMessage):
"""處理初始化響應"""
if message.result:
self.state.server_info = message.result.get('serverInfo', {})
self.state.server_capabilities = message.result.get('capabilities', {})
self.state.initialized = True
debug_log("✅ MCP 初始化完成")
async def _handle_tools_list_response(self, message: MCPMessage):
"""處理工具列表響應"""
if message.result and 'tools' in message.result:
self.state.available_tools = message.result['tools']
self.state.tools_discovered = True
debug_log(f"✅ 發現 {len(self.state.available_tools)} 個工具")
async def _handle_tools_call_response(self, message: MCPMessage):
"""處理工具調用響應"""
if message.result:
debug_log("✅ 工具調用完成")
elif message.error:
debug_log(f"❌ 工具調用失敗: {message.error}")
async def initialize(self) -> bool:
"""初始化 MCP 連接"""
try:
debug_log("🔄 初始化 MCP 連接...")
message = MCPMessage(
id=self._generate_message_id(),
method="initialize",
params={
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "mcp-test-client",
"version": "1.0.0"
},
"capabilities": {
"roots": {
"listChanged": True
},
"sampling": {}
}
}
)
response = await self.send_message(message)
if response and response.result:
self.performance_monitor.checkpoint("initialized")
return True
else:
debug_log(f"❌ 初始化失敗: {response.error if response else '無響應'}")
return False
except Exception as e:
debug_log(f"❌ 初始化異常: {e}")
return False
async def list_tools(self) -> List[Dict[str, Any]]:
"""獲取可用工具列表"""
try:
debug_log("🔍 獲取工具列表...")
message = MCPMessage(
id=self._generate_message_id(),
method="tools/list",
params={}
)
response = await self.send_message(message)
if response and response.result and 'tools' in response.result:
tools = response.result['tools']
debug_log(f"✅ 獲取到 {len(tools)} 個工具")
self.performance_monitor.checkpoint("tools_listed", {"tools_count": len(tools)})
return tools
else:
debug_log(f"❌ 獲取工具列表失敗: {response.error if response else '無響應'}")
return []
except Exception as e:
debug_log(f"❌ 獲取工具列表異常: {e}")
return []
async def call_interactive_feedback(self, project_directory: str, summary: str,
timeout: int = 60) -> Dict[str, Any]:
"""調用互動回饋工具"""
try:
debug_log("🎯 調用互動回饋工具...")
message = MCPMessage(
id=self._generate_message_id(),
method="tools/call",
params={
"name": "interactive_feedback",
"arguments": {
"project_directory": project_directory,
"summary": summary,
"timeout": timeout
}
}
)
# 設置較長的超時時間,因為需要等待用戶互動
old_timeout = self.config.mcp_timeout
self.config.mcp_timeout = timeout + 30 # 額外 30 秒緩衝
try:
response = await self.send_message(message)
if response and response.result:
result = response.result
debug_log("✅ 互動回饋工具調用成功")
self.performance_monitor.checkpoint("interactive_feedback_completed")
return result
else:
error_msg = response.error if response else "無響應"
debug_log(f"❌ 互動回饋工具調用失敗: {error_msg}")
return {"error": str(error_msg)}
finally:
self.config.mcp_timeout = old_timeout
except Exception as e:
debug_log(f"❌ 互動回饋工具調用異常: {e}")
return {"error": str(e)}
async def full_workflow_test(self, project_directory: Optional[str] = None,
summary: Optional[str] = None) -> Dict[str, Any]:
"""執行完整的工作流程測試"""
try:
debug_log("🚀 開始完整工作流程測試...")
self.performance_monitor.start()
# 使用配置中的默認值
project_dir = project_directory or self.config.test_project_dir or str(Path.cwd())
test_summary = summary or self.config.test_summary
results = {
"success": False,
"steps": {},
"performance": {},
"errors": []
}
# 步驟 1: 啟動服務器
if not await self.start_server():
results["errors"].append("服務器啟動失敗")
return results
results["steps"]["server_started"] = True
# 啟動消息讀取任務
read_task = asyncio.create_task(self.read_messages())
try:
# 步驟 2: 初始化連接
if not await self.initialize():
results["errors"].append("MCP 初始化失敗")
return results
results["steps"]["initialized"] = True
# 步驟 3: 獲取工具列表
tools = await self.list_tools()
if not tools:
results["errors"].append("獲取工具列表失敗")
return results
results["steps"]["tools_discovered"] = True
results["tools_count"] = len(tools)
# 檢查是否有 interactive_feedback 工具
has_interactive_tool = any(
tool.get("name") == "interactive_feedback"
for tool in tools
)
if not has_interactive_tool:
results["errors"].append("未找到 interactive_feedback 工具")
return results
# 步驟 4: 調用互動回饋工具
feedback_result = await self.call_interactive_feedback(
project_dir, test_summary, self.config.test_timeout
)
if "error" in feedback_result:
results["errors"].append(f"互動回饋調用失敗: {feedback_result['error']}")
return results
results["steps"]["interactive_feedback_called"] = True
results["feedback_result"] = feedback_result
results["success"] = True
debug_log("🎉 完整工作流程測試成功完成")
finally:
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
pass
return results
except Exception as e:
debug_log(f"❌ 完整工作流程測試異常: {e}")
results["errors"].append(f"測試異常: {str(e)}")
return results
finally:
# 獲取性能數據
self.performance_monitor.stop()
results["performance"] = self.performance_monitor.get_summary()
# 清理資源
await self.cleanup()
async def cleanup(self):
"""清理資源"""
await self.stop_server()
# 取消所有待處理的請求
for future in self.pending_requests.values():
if not future.done():
future.cancel()
self.pending_requests.clear()
self.performance_monitor.stop()
debug_log("🧹 MCP 客戶端資源已清理")

View File

@ -0,0 +1,447 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
測試報告生成器
==============
生成詳細的 MCP 測試報告支持多種格式輸出
"""
import json
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from pathlib import Path
from dataclasses import dataclass, asdict
from .config import TestConfig, DEFAULT_CONFIG
from .utils import TestUtils
from .validators import TestValidators, ValidationResult
from ..debug import debug_log
@dataclass
class TestReport:
"""測試報告數據結構"""
timestamp: str
duration: float
total_scenarios: int
passed_scenarios: int
failed_scenarios: int
success_rate: float
scenarios: List[Dict[str, Any]]
validation_summary: Dict[str, Any]
performance_summary: Dict[str, Any]
system_info: Dict[str, Any]
config: Dict[str, Any]
errors: List[str]
warnings: List[str]
class TestReporter:
"""測試報告生成器"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
self.validators = TestValidators(config)
def generate_report(self, test_results: Dict[str, Any]) -> TestReport:
"""生成測試報告"""
start_time = time.time()
# 提取基本信息
scenarios = test_results.get("results", [])
total_scenarios = test_results.get("total_scenarios", len(scenarios))
passed_scenarios = test_results.get("passed_scenarios", 0)
failed_scenarios = test_results.get("failed_scenarios", 0)
# 計算成功率
success_rate = passed_scenarios / total_scenarios if total_scenarios > 0 else 0
# 驗證測試結果
validation_results = {}
for i, scenario in enumerate(scenarios):
validation_results[f"scenario_{i}"] = self.validators.result_validator.validate_test_result(scenario)
validation_summary = self.validators.get_validation_summary(validation_results)
# 生成性能摘要
performance_summary = self._generate_performance_summary(scenarios)
# 收集錯誤和警告
all_errors = []
all_warnings = []
for scenario in scenarios:
all_errors.extend(scenario.get("errors", []))
# 計算總持續時間
total_duration = 0
for scenario in scenarios:
perf = scenario.get("performance", {})
duration = perf.get("total_duration", 0) or perf.get("total_time", 0)
total_duration += duration
# 創建報告
report = TestReport(
timestamp=datetime.now().isoformat(),
duration=total_duration,
total_scenarios=total_scenarios,
passed_scenarios=passed_scenarios,
failed_scenarios=failed_scenarios,
success_rate=success_rate,
scenarios=scenarios,
validation_summary=validation_summary,
performance_summary=performance_summary,
system_info=TestUtils.get_system_info(),
config=self.config.to_dict(),
errors=all_errors,
warnings=all_warnings
)
debug_log(f"📊 測試報告生成完成 (耗時: {time.time() - start_time:.2f}s)")
return report
def _generate_performance_summary(self, scenarios: List[Dict[str, Any]]) -> Dict[str, Any]:
"""生成性能摘要"""
total_duration = 0
min_duration = float('inf')
max_duration = 0
durations = []
memory_usage = []
for scenario in scenarios:
perf = scenario.get("performance", {})
# 處理持續時間
duration = perf.get("total_duration", 0) or perf.get("total_time", 0)
if duration > 0:
total_duration += duration
min_duration = min(min_duration, duration)
max_duration = max(max_duration, duration)
durations.append(duration)
# 處理內存使用
memory_diff = perf.get("memory_diff", {})
if memory_diff:
memory_usage.append(memory_diff)
# 計算平均值
avg_duration = total_duration / len(durations) if durations else 0
# 計算中位數
if durations:
sorted_durations = sorted(durations)
n = len(sorted_durations)
median_duration = (
sorted_durations[n // 2] if n % 2 == 1
else (sorted_durations[n // 2 - 1] + sorted_durations[n // 2]) / 2
)
else:
median_duration = 0
return {
"total_duration": total_duration,
"total_duration_formatted": TestUtils.format_duration(total_duration),
"avg_duration": avg_duration,
"avg_duration_formatted": TestUtils.format_duration(avg_duration),
"median_duration": median_duration,
"median_duration_formatted": TestUtils.format_duration(median_duration),
"min_duration": min_duration if min_duration != float('inf') else 0,
"min_duration_formatted": TestUtils.format_duration(min_duration if min_duration != float('inf') else 0),
"max_duration": max_duration,
"max_duration_formatted": TestUtils.format_duration(max_duration),
"scenarios_with_performance": len(durations),
"memory_usage_samples": len(memory_usage)
}
def save_report(self, report: TestReport, output_path: Optional[Path] = None) -> Path:
"""保存測試報告"""
if output_path is None:
output_dir = self.config.ensure_report_dir()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"mcp_test_report_{timestamp}.{self.config.report_format}"
output_path = output_dir / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
if self.config.report_format.lower() == "json":
self._save_json_report(report, output_path)
elif self.config.report_format.lower() == "html":
self._save_html_report(report, output_path)
elif self.config.report_format.lower() == "markdown":
self._save_markdown_report(report, output_path)
else:
raise ValueError(f"不支持的報告格式: {self.config.report_format}")
debug_log(f"📄 測試報告已保存: {output_path}")
return output_path
def _save_json_report(self, report: TestReport, output_path: Path):
"""保存 JSON 格式報告"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(asdict(report), f, indent=2, ensure_ascii=False, default=str)
def _save_html_report(self, report: TestReport, output_path: Path):
"""保存 HTML 格式報告"""
html_content = self._generate_html_report(report)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
def _save_markdown_report(self, report: TestReport, output_path: Path):
"""保存 Markdown 格式報告"""
markdown_content = self._generate_markdown_report(report)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def _generate_html_report(self, report: TestReport) -> str:
"""生成 HTML 報告"""
# 狀態圖標
status_icon = "" if report.success_rate == 1.0 else "" if report.success_rate == 0 else "⚠️"
# 性能圖表數據(簡化版)
scenario_names = [s.get("scenario_name", f"Scenario {i}") for i, s in enumerate(report.scenarios)]
scenario_durations = []
for s in report.scenarios:
perf = s.get("performance", {})
duration = perf.get("total_duration", 0) or perf.get("total_time", 0)
scenario_durations.append(duration)
html = f"""
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MCP 測試報告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }}
.container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }}
.header {{ text-align: center; margin-bottom: 30px; }}
.status {{ font-size: 24px; margin: 10px 0; }}
.success {{ color: #28a745; }}
.warning {{ color: #ffc107; }}
.error {{ color: #dc3545; }}
.summary {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin: 20px 0; }}
.card {{ background: #f8f9fa; padding: 15px; border-radius: 6px; border-left: 4px solid #007bff; }}
.card h3 {{ margin: 0 0 10px 0; color: #333; }}
.card .value {{ font-size: 24px; font-weight: bold; color: #007bff; }}
.scenarios {{ margin: 20px 0; }}
.scenario {{ background: #f8f9fa; margin: 10px 0; padding: 15px; border-radius: 6px; border-left: 4px solid #28a745; }}
.scenario.failed {{ border-left-color: #dc3545; }}
.scenario h4 {{ margin: 0 0 10px 0; }}
.scenario-details {{ display: grid; grid-template-columns: 1fr 1fr; gap: 10px; font-size: 14px; }}
.errors {{ background: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; margin: 10px 0; }}
.performance {{ margin: 20px 0; }}
.footer {{ text-align: center; margin-top: 30px; color: #666; font-size: 12px; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🧪 MCP 測試報告</h1>
<div class="status {'success' if report.success_rate == 1.0 else 'warning' if report.success_rate > 0 else 'error'}">
{status_icon} 測試完成
</div>
<p>生成時間: {report.timestamp}</p>
</div>
<div class="summary">
<div class="card">
<h3>總測試數</h3>
<div class="value">{report.total_scenarios}</div>
</div>
<div class="card">
<h3>通過測試</h3>
<div class="value" style="color: #28a745;">{report.passed_scenarios}</div>
</div>
<div class="card">
<h3>失敗測試</h3>
<div class="value" style="color: #dc3545;">{report.failed_scenarios}</div>
</div>
<div class="card">
<h3>成功率</h3>
<div class="value">{report.success_rate:.1%}</div>
</div>
<div class="card">
<h3>總耗時</h3>
<div class="value">{report.performance_summary.get('total_duration_formatted', 'N/A')}</div>
</div>
<div class="card">
<h3>平均耗時</h3>
<div class="value">{report.performance_summary.get('avg_duration_formatted', 'N/A')}</div>
</div>
</div>
<div class="scenarios">
<h2>📋 測試場景詳情</h2>
"""
for i, scenario in enumerate(report.scenarios):
success = scenario.get("success", False)
scenario_name = scenario.get("scenario_name", f"Scenario {i+1}")
scenario_desc = scenario.get("scenario_description", "無描述")
perf = scenario.get("performance", {})
duration = perf.get("total_duration", 0) or perf.get("total_time", 0)
duration_str = TestUtils.format_duration(duration) if duration > 0 else "N/A"
steps = scenario.get("steps", {})
completed_steps = sum(1 for v in steps.values() if v)
total_steps = len(steps)
errors = scenario.get("errors", [])
html += f"""
<div class="scenario {'failed' if not success else ''}">
<h4>{'' if success else ''} {scenario_name}</h4>
<p>{scenario_desc}</p>
<div class="scenario-details">
<div><strong>狀態:</strong> {'通過' if success else '失敗'}</div>
<div><strong>耗時:</strong> {duration_str}</div>
<div><strong>完成步驟:</strong> {completed_steps}/{total_steps}</div>
<div><strong>錯誤數:</strong> {len(errors)}</div>
</div>
"""
if errors:
html += '<div class="errors"><strong>錯誤信息:</strong><ul>'
for error in errors:
html += f'<li>{error}</li>'
html += '</ul></div>'
html += '</div>'
html += f"""
</div>
<div class="performance">
<h2>📊 性能統計</h2>
<div class="summary">
<div class="card">
<h3>最快測試</h3>
<div class="value">{report.performance_summary.get('min_duration_formatted', 'N/A')}</div>
</div>
<div class="card">
<h3>最慢測試</h3>
<div class="value">{report.performance_summary.get('max_duration_formatted', 'N/A')}</div>
</div>
<div class="card">
<h3>中位數</h3>
<div class="value">{report.performance_summary.get('median_duration_formatted', 'N/A')}</div>
</div>
</div>
</div>
<div class="footer">
<p>MCP Feedback Enhanced 測試框架 | 生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
</div>
</body>
</html>
"""
return html
def _generate_markdown_report(self, report: TestReport) -> str:
"""生成 Markdown 報告"""
status_icon = "" if report.success_rate == 1.0 else "" if report.success_rate == 0 else "⚠️"
md = f"""# 🧪 MCP 測試報告
{status_icon} **測試狀態**: {'全部通過' if report.success_rate == 1.0 else '部分失敗' if report.success_rate > 0 else '全部失敗'}
**生成時間**: {report.timestamp}
## 📊 測試摘要
| 指標 | 數值 |
|------|------|
| 總測試數 | {report.total_scenarios} |
| 通過測試 | {report.passed_scenarios} |
| 失敗測試 | {report.failed_scenarios} |
| 成功率 | {report.success_rate:.1%} |
| 總耗時 | {report.performance_summary.get('total_duration_formatted', 'N/A')} |
| 平均耗時 | {report.performance_summary.get('avg_duration_formatted', 'N/A')} |
## 📋 測試場景詳情
"""
for i, scenario in enumerate(report.scenarios):
success = scenario.get("success", False)
scenario_name = scenario.get("scenario_name", f"Scenario {i+1}")
scenario_desc = scenario.get("scenario_description", "無描述")
perf = scenario.get("performance", {})
duration = perf.get("total_duration", 0) or perf.get("total_time", 0)
duration_str = TestUtils.format_duration(duration) if duration > 0 else "N/A"
steps = scenario.get("steps", {})
completed_steps = sum(1 for v in steps.values() if v)
total_steps = len(steps)
errors = scenario.get("errors", [])
md += f"""### {'' if success else ''} {scenario_name}
**描述**: {scenario_desc}
- **狀態**: {'通過' if success else '失敗'}
- **耗時**: {duration_str}
- **完成步驟**: {completed_steps}/{total_steps}
- **錯誤數**: {len(errors)}
"""
if errors:
md += "**錯誤信息**:\n"
for error in errors:
md += f"- {error}\n"
md += "\n"
md += f"""## 📊 性能統計
| 指標 | 數值 |
|------|------|
| 最快測試 | {report.performance_summary.get('min_duration_formatted', 'N/A')} |
| 最慢測試 | {report.performance_summary.get('max_duration_formatted', 'N/A')} |
| 中位數 | {report.performance_summary.get('median_duration_formatted', 'N/A')} |
## 🔧 系統信息
| 項目 | |
|------|---|
| CPU 核心數 | {report.system_info.get('cpu_count', 'N/A')} |
| 總內存 | {report.system_info.get('memory_total', 'N/A')} |
| 可用內存 | {report.system_info.get('memory_available', 'N/A')} |
---
*報告由 MCP Feedback Enhanced 測試框架生成 | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return md
def print_summary(self, report: TestReport):
"""打印測試摘要到控制台"""
status_icon = "" if report.success_rate == 1.0 else "" if report.success_rate == 0 else "⚠️"
print("\n" + "="*60)
print(f"🧪 MCP 測試報告摘要 {status_icon}")
print("="*60)
print(f"📊 總測試數: {report.total_scenarios}")
print(f"✅ 通過測試: {report.passed_scenarios}")
print(f"❌ 失敗測試: {report.failed_scenarios}")
print(f"📈 成功率: {report.success_rate:.1%}")
print(f"⏱️ 總耗時: {report.performance_summary.get('total_duration_formatted', 'N/A')}")
print(f"⚡ 平均耗時: {report.performance_summary.get('avg_duration_formatted', 'N/A')}")
if report.errors:
print(f"\n❌ 發現 {len(report.errors)} 個錯誤:")
for error in report.errors[:5]: # 只顯示前5個錯誤
print(f"{error}")
if len(report.errors) > 5:
print(f" ... 還有 {len(report.errors) - 5} 個錯誤")
print("="*60)

View File

@ -0,0 +1,469 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
測試場景定義
============
定義各種 MCP 測試場景包括正常流程錯誤處理性能測試等
"""
import asyncio
import time
import random
from typing import Dict, Any, List, Optional, Callable, Awaitable
from dataclasses import dataclass, field
from pathlib import Path
from .mcp_client import MCPTestClient
from .config import TestConfig, DEFAULT_CONFIG
from .utils import TestUtils, PerformanceMonitor, performance_context
from ..debug import debug_log
@dataclass
class TestScenario:
"""測試場景類"""
name: str
description: str
timeout: int = 120
retry_count: int = 1
parallel: bool = False
tags: List[str] = field(default_factory=list)
setup: Optional[Callable] = None
teardown: Optional[Callable] = None
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行測試場景"""
raise NotImplementedError
class BasicWorkflowScenario(TestScenario):
"""基礎工作流程測試場景"""
def __init__(self):
super().__init__(
name="basic_workflow",
description="測試基本的 MCP 工作流程:初始化 -> 工具發現 -> 工具調用",
timeout=180,
tags=["basic", "workflow", "integration"]
)
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行基礎工作流程測試"""
async with performance_context("basic_workflow") as monitor:
result = await client.full_workflow_test()
# 添加額外的驗證
if result["success"]:
# 檢查必要的步驟是否完成
required_steps = ["server_started", "initialized", "tools_discovered", "interactive_feedback_called"]
missing_steps = [step for step in required_steps if not result["steps"].get(step, False)]
if missing_steps:
result["success"] = False
result["errors"].append(f"缺少必要步驟: {missing_steps}")
return result
class QuickConnectionScenario(TestScenario):
"""快速連接測試場景"""
def __init__(self):
super().__init__(
name="quick_connection",
description="測試 MCP 服務器的快速啟動和連接",
timeout=30,
tags=["quick", "connection", "startup"]
)
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行快速連接測試"""
result = {
"success": False,
"steps": {},
"performance": {},
"errors": []
}
try:
start_time = time.time()
# 啟動服務器
if not await client.start_server():
result["errors"].append("服務器啟動失敗")
return result
result["steps"]["server_started"] = True
# 啟動消息讀取
read_task = asyncio.create_task(client.read_messages())
try:
# 初始化連接
if not await client.initialize():
result["errors"].append("初始化失敗")
return result
result["steps"]["initialized"] = True
# 獲取工具列表
tools = await client.list_tools()
if not tools:
result["errors"].append("工具列表為空")
return result
result["steps"]["tools_discovered"] = True
end_time = time.time()
result["performance"]["total_time"] = end_time - start_time
result["performance"]["tools_count"] = len(tools)
result["success"] = True
finally:
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
pass
except Exception as e:
result["errors"].append(f"測試異常: {str(e)}")
finally:
await client.cleanup()
return result
class TimeoutHandlingScenario(TestScenario):
"""超時處理測試場景"""
def __init__(self):
super().__init__(
name="timeout_handling",
description="測試超時情況下的處理機制",
timeout=60,
tags=["timeout", "error_handling", "resilience"]
)
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行超時處理測試"""
result = {
"success": False,
"steps": {},
"performance": {},
"errors": []
}
try:
# 設置很短的超時時間來觸發超時
original_timeout = client.config.mcp_timeout
client.config.mcp_timeout = 5 # 5 秒超時
# 啟動服務器
if not await client.start_server():
result["errors"].append("服務器啟動失敗")
return result
result["steps"]["server_started"] = True
# 啟動消息讀取
read_task = asyncio.create_task(client.read_messages())
try:
# 初始化連接
if not await client.initialize():
result["errors"].append("初始化失敗")
return result
result["steps"]["initialized"] = True
# 嘗試調用互動回饋工具(應該超時)
feedback_result = await client.call_interactive_feedback(
str(Path.cwd()),
"超時測試 - 這個調用應該會超時",
timeout=10 # 10 秒超時,但 MCP 客戶端設置為 5 秒
)
# 檢查是否正確處理了超時
if "error" in feedback_result:
result["steps"]["timeout_handled"] = True
result["success"] = True
debug_log("✅ 超時處理測試成功")
else:
result["errors"].append("未正確處理超時情況")
finally:
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
pass
# 恢復原始超時設置
client.config.mcp_timeout = original_timeout
except Exception as e:
result["errors"].append(f"測試異常: {str(e)}")
finally:
await client.cleanup()
return result
class ConcurrentCallsScenario(TestScenario):
"""並發調用測試場景"""
def __init__(self):
super().__init__(
name="concurrent_calls",
description="測試並發 MCP 調用的處理能力",
timeout=300,
parallel=True,
tags=["concurrent", "performance", "stress"]
)
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行並發調用測試"""
result = {
"success": False,
"steps": {},
"performance": {},
"errors": []
}
try:
# 啟動服務器
if not await client.start_server():
result["errors"].append("服務器啟動失敗")
return result
result["steps"]["server_started"] = True
# 啟動消息讀取
read_task = asyncio.create_task(client.read_messages())
try:
# 初始化連接
if not await client.initialize():
result["errors"].append("初始化失敗")
return result
result["steps"]["initialized"] = True
# 並發獲取工具列表
concurrent_count = 5
tasks = []
for i in range(concurrent_count):
task = asyncio.create_task(client.list_tools())
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
# 分析結果
successful_calls = 0
failed_calls = 0
for i, res in enumerate(results):
if isinstance(res, Exception):
failed_calls += 1
debug_log(f"並發調用 {i+1} 失敗: {res}")
elif isinstance(res, list) and len(res) > 0:
successful_calls += 1
else:
failed_calls += 1
result["performance"]["concurrent_count"] = concurrent_count
result["performance"]["successful_calls"] = successful_calls
result["performance"]["failed_calls"] = failed_calls
result["performance"]["total_time"] = end_time - start_time
result["performance"]["avg_time_per_call"] = (end_time - start_time) / concurrent_count
# 判斷成功條件:至少 80% 的調用成功
success_rate = successful_calls / concurrent_count
if success_rate >= 0.8:
result["success"] = True
result["steps"]["concurrent_calls_handled"] = True
debug_log(f"✅ 並發調用測試成功 (成功率: {success_rate:.1%})")
else:
result["errors"].append(f"並發調用成功率過低: {success_rate:.1%}")
finally:
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
pass
except Exception as e:
result["errors"].append(f"測試異常: {str(e)}")
finally:
await client.cleanup()
return result
class MockTestScenario(TestScenario):
"""模擬測試場景(用於演示)"""
def __init__(self):
super().__init__(
name="mock_test",
description="模擬測試場景,用於演示測試框架功能",
timeout=10,
tags=["mock", "demo", "quick"]
)
async def run(self, client: MCPTestClient) -> Dict[str, Any]:
"""運行模擬測試"""
result = {
"success": True,
"steps": {
"mock_step_1": True,
"mock_step_2": True,
"mock_step_3": True
},
"performance": {
"total_duration": 0.5,
"total_time": 0.5
},
"errors": []
}
# 模擬一些處理時間
await asyncio.sleep(0.5)
debug_log("✅ 模擬測試完成")
return result
class TestScenarios:
"""測試場景管理器"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
self.scenarios: Dict[str, TestScenario] = {}
self._register_default_scenarios()
def _register_default_scenarios(self):
"""註冊默認測試場景"""
scenarios = [
MockTestScenario(), # 添加模擬測試場景
BasicWorkflowScenario(),
QuickConnectionScenario(),
TimeoutHandlingScenario(),
ConcurrentCallsScenario(),
]
for scenario in scenarios:
self.scenarios[scenario.name] = scenario
def register_scenario(self, scenario: TestScenario):
"""註冊自定義測試場景"""
self.scenarios[scenario.name] = scenario
def get_scenario(self, name: str) -> Optional[TestScenario]:
"""獲取測試場景"""
return self.scenarios.get(name)
def list_scenarios(self, tags: Optional[List[str]] = None) -> List[TestScenario]:
"""列出測試場景"""
scenarios = list(self.scenarios.values())
if tags:
scenarios = [
scenario for scenario in scenarios
if any(tag in scenario.tags for tag in tags)
]
return scenarios
async def run_scenario(self, scenario_name: str) -> Dict[str, Any]:
"""運行單個測試場景"""
scenario = self.get_scenario(scenario_name)
if not scenario:
return {
"success": False,
"errors": [f"未找到測試場景: {scenario_name}"]
}
debug_log(f"🧪 運行測試場景: {scenario.name}")
debug_log(f" 描述: {scenario.description}")
client = MCPTestClient(self.config)
try:
# 執行設置
if scenario.setup:
await scenario.setup()
# 運行測試
result = await TestUtils.timeout_wrapper(
scenario.run(client),
scenario.timeout,
f"測試場景 '{scenario.name}' 超時"
)
result["scenario_name"] = scenario.name
result["scenario_description"] = scenario.description
return result
except Exception as e:
debug_log(f"❌ 測試場景 '{scenario.name}' 執行失敗: {e}")
return {
"success": False,
"scenario_name": scenario.name,
"scenario_description": scenario.description,
"errors": [f"執行異常: {str(e)}"]
}
finally:
# 執行清理
if scenario.teardown:
try:
await scenario.teardown()
except Exception as e:
debug_log(f"⚠️ 測試場景 '{scenario.name}' 清理失敗: {e}")
async def run_all_scenarios(self, tags: Optional[List[str]] = None) -> Dict[str, Any]:
"""運行所有測試場景"""
scenarios = self.list_scenarios(tags)
if not scenarios:
return {
"success": False,
"total_scenarios": 0,
"passed_scenarios": 0,
"failed_scenarios": 0,
"results": [],
"errors": ["沒有找到匹配的測試場景"]
}
debug_log(f"🚀 開始運行 {len(scenarios)} 個測試場景...")
results = []
passed_count = 0
failed_count = 0
for scenario in scenarios:
result = await self.run_scenario(scenario.name)
results.append(result)
if result.get("success", False):
passed_count += 1
debug_log(f"{scenario.name}: 通過")
else:
failed_count += 1
debug_log(f"{scenario.name}: 失敗")
overall_success = failed_count == 0
debug_log(f"📊 測試完成: {passed_count}/{len(scenarios)} 通過")
return {
"success": overall_success,
"total_scenarios": len(scenarios),
"passed_scenarios": passed_count,
"failed_scenarios": failed_count,
"results": results
}

View File

@ -0,0 +1,266 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
測試工具函數
============
提供 MCP 測試框架使用的通用工具函數
"""
import asyncio
import time
import json
import uuid
import socket
import psutil
import threading
from typing import Dict, Any, Optional, List, Callable, Awaitable
from pathlib import Path
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from ..debug import debug_log
class TestUtils:
"""測試工具類"""
@staticmethod
def generate_test_id() -> str:
"""生成測試 ID"""
return f"test_{uuid.uuid4().hex[:8]}"
@staticmethod
def generate_session_id() -> str:
"""生成會話 ID"""
return str(uuid.uuid4())
@staticmethod
def get_timestamp() -> str:
"""獲取當前時間戳"""
return datetime.now().isoformat()
@staticmethod
def format_duration(seconds: float) -> str:
"""格式化持續時間"""
if seconds < 1:
return f"{seconds*1000:.1f}ms"
elif seconds < 60:
return f"{seconds:.2f}s"
else:
minutes = int(seconds // 60)
remaining_seconds = seconds % 60
return f"{minutes}m {remaining_seconds:.1f}s"
@staticmethod
def find_free_port(start_port: int = 8765, max_attempts: int = 100) -> int:
"""尋找可用端口"""
for port in range(start_port, start_port + max_attempts):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', port))
return port
except OSError:
continue
raise RuntimeError(f"無法找到可用端口 (嘗試範圍: {start_port}-{start_port + max_attempts})")
@staticmethod
def is_port_open(host: str, port: int, timeout: float = 1.0) -> bool:
"""檢查端口是否開放"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
result = s.connect_ex((host, port))
return result == 0
except Exception:
return False
@staticmethod
async def wait_for_port(host: str, port: int, timeout: float = 30.0, interval: float = 0.5) -> bool:
"""等待端口開放"""
start_time = time.time()
while time.time() - start_time < timeout:
if TestUtils.is_port_open(host, port):
return True
await asyncio.sleep(interval)
return False
@staticmethod
def get_system_info() -> Dict[str, Any]:
"""獲取系統信息"""
try:
return {
'cpu_count': psutil.cpu_count(),
'memory_total': psutil.virtual_memory().total,
'memory_available': psutil.virtual_memory().available,
'disk_usage': psutil.disk_usage('/').percent if hasattr(psutil, 'disk_usage') else None,
'platform': psutil.WINDOWS if hasattr(psutil, 'WINDOWS') else 'unknown'
}
except Exception as e:
debug_log(f"獲取系統信息失敗: {e}")
return {}
@staticmethod
def measure_memory_usage() -> Dict[str, float]:
"""測量內存使用情況"""
try:
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent()
}
except Exception as e:
debug_log(f"測量內存使用失敗: {e}")
return {}
@staticmethod
async def timeout_wrapper(coro: Awaitable, timeout: float, error_message: str = "操作超時"):
"""為協程添加超時包裝"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"{error_message} (超時: {timeout}s)")
@staticmethod
def safe_json_loads(data: str) -> Optional[Dict[str, Any]]:
"""安全的 JSON 解析"""
try:
return json.loads(data)
except (json.JSONDecodeError, TypeError) as e:
debug_log(f"JSON 解析失敗: {e}")
return None
@staticmethod
def safe_json_dumps(data: Any, indent: int = 2) -> str:
"""安全的 JSON 序列化"""
try:
return json.dumps(data, indent=indent, ensure_ascii=False, default=str)
except (TypeError, ValueError) as e:
debug_log(f"JSON 序列化失敗: {e}")
return str(data)
@staticmethod
def create_test_directory(base_dir: str = "test_temp") -> Path:
"""創建測試目錄"""
test_dir = Path(base_dir) / f"test_{uuid.uuid4().hex[:8]}"
test_dir.mkdir(parents=True, exist_ok=True)
return test_dir
@staticmethod
def cleanup_test_directory(test_dir: Path):
"""清理測試目錄"""
try:
if test_dir.exists() and test_dir.is_dir():
import shutil
shutil.rmtree(test_dir)
except Exception as e:
debug_log(f"清理測試目錄失敗: {e}")
class PerformanceMonitor:
"""性能監控器"""
def __init__(self):
self.start_time: Optional[float] = None
self.end_time: Optional[float] = None
self.memory_start: Optional[Dict[str, float]] = None
self.memory_end: Optional[Dict[str, float]] = None
self.checkpoints: List[Dict[str, Any]] = []
def start(self):
"""開始監控"""
self.start_time = time.time()
self.memory_start = TestUtils.measure_memory_usage()
self.checkpoints = []
def checkpoint(self, name: str, data: Optional[Dict[str, Any]] = None):
"""添加檢查點"""
if self.start_time is None:
return
checkpoint = {
'name': name,
'timestamp': time.time(),
'elapsed': time.time() - self.start_time,
'memory': TestUtils.measure_memory_usage(),
'data': data or {}
}
self.checkpoints.append(checkpoint)
def stop(self):
"""停止監控"""
self.end_time = time.time()
self.memory_end = TestUtils.measure_memory_usage()
def get_summary(self) -> Dict[str, Any]:
"""獲取監控摘要"""
if self.start_time is None or self.end_time is None:
return {}
total_duration = self.end_time - self.start_time
memory_diff = {}
if self.memory_start and self.memory_end:
for key in self.memory_start:
if key in self.memory_end:
memory_diff[f"memory_{key}_diff"] = self.memory_end[key] - self.memory_start[key]
return {
'total_duration': total_duration,
'total_duration_formatted': TestUtils.format_duration(total_duration),
'memory_start': self.memory_start,
'memory_end': self.memory_end,
'memory_diff': memory_diff,
'checkpoints_count': len(self.checkpoints),
'checkpoints': self.checkpoints
}
@asynccontextmanager
async def performance_context(name: str = "test"):
"""性能監控上下文管理器"""
monitor = PerformanceMonitor()
monitor.start()
try:
yield monitor
finally:
monitor.stop()
summary = monitor.get_summary()
debug_log(f"性能監控 [{name}]: {TestUtils.format_duration(summary.get('total_duration', 0))}")
class AsyncEventWaiter:
"""異步事件等待器"""
def __init__(self):
self.events: Dict[str, asyncio.Event] = {}
self.results: Dict[str, Any] = {}
def create_event(self, event_name: str):
"""創建事件"""
self.events[event_name] = asyncio.Event()
def set_event(self, event_name: str, result: Any = None):
"""設置事件"""
if event_name in self.events:
self.results[event_name] = result
self.events[event_name].set()
async def wait_for_event(self, event_name: str, timeout: float = 30.0) -> Any:
"""等待事件"""
if event_name not in self.events:
self.create_event(event_name)
try:
await asyncio.wait_for(self.events[event_name].wait(), timeout=timeout)
return self.results.get(event_name)
except asyncio.TimeoutError:
raise TimeoutError(f"等待事件 '{event_name}' 超時 ({timeout}s)")
def clear_event(self, event_name: str):
"""清除事件"""
if event_name in self.events:
self.events[event_name].clear()
self.results.pop(event_name, None)

View File

@ -0,0 +1,394 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
測試結果驗證器
==============
驗證 MCP 測試結果是否符合規範和預期
"""
import json
import re
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from pathlib import Path
from .config import TestConfig, DEFAULT_CONFIG
from .utils import TestUtils
from ..debug import debug_log
@dataclass
class ValidationResult:
"""驗證結果"""
valid: bool
errors: List[str]
warnings: List[str]
details: Dict[str, Any]
def add_error(self, message: str):
"""添加錯誤"""
self.errors.append(message)
self.valid = False
def add_warning(self, message: str):
"""添加警告"""
self.warnings.append(message)
def add_detail(self, key: str, value: Any):
"""添加詳細信息"""
self.details[key] = value
class MCPMessageValidator:
"""MCP 消息驗證器"""
@staticmethod
def validate_json_rpc(message: Dict[str, Any]) -> ValidationResult:
"""驗證 JSON-RPC 2.0 格式"""
result = ValidationResult(True, [], [], {})
# 檢查必需字段
if "jsonrpc" not in message:
result.add_error("缺少 'jsonrpc' 字段")
elif message["jsonrpc"] != "2.0":
result.add_error(f"無效的 jsonrpc 版本: {message['jsonrpc']}")
# 檢查消息類型
is_request = "method" in message
is_response = "result" in message or "error" in message
is_notification = is_request and "id" not in message
if not (is_request or is_response):
result.add_error("消息既不是請求也不是響應")
if is_request and is_response:
result.add_error("消息不能同時是請求和響應")
# 驗證請求格式
if is_request:
if not isinstance(message.get("method"), str):
result.add_error("method 字段必須是字符串")
if not is_notification and "id" not in message:
result.add_error("非通知請求必須包含 id 字段")
# 驗證響應格式
if is_response:
if "id" not in message:
result.add_error("響應必須包含 id 字段")
if "result" in message and "error" in message:
result.add_error("響應不能同時包含 result 和 error")
if "result" not in message and "error" not in message:
result.add_error("響應必須包含 result 或 error")
# 驗證錯誤格式
if "error" in message:
error = message["error"]
if not isinstance(error, dict):
result.add_error("error 字段必須是對象")
else:
if "code" not in error:
result.add_error("error 對象必須包含 code 字段")
elif not isinstance(error["code"], int):
result.add_error("error.code 必須是整數")
if "message" not in error:
result.add_error("error 對象必須包含 message 字段")
elif not isinstance(error["message"], str):
result.add_error("error.message 必須是字符串")
result.add_detail("message_type", "request" if is_request else "response")
result.add_detail("is_notification", is_notification)
return result
@staticmethod
def validate_mcp_initialize(message: Dict[str, Any]) -> ValidationResult:
"""驗證 MCP 初始化消息"""
result = ValidationResult(True, [], [], {})
# 先驗證 JSON-RPC 格式
json_rpc_result = MCPMessageValidator.validate_json_rpc(message)
result.errors.extend(json_rpc_result.errors)
result.warnings.extend(json_rpc_result.warnings)
if not json_rpc_result.valid:
result.valid = False
return result
# 驗證初始化特定字段
if message.get("method") == "initialize":
params = message.get("params", {})
if "protocolVersion" not in params:
result.add_error("初始化請求必須包含 protocolVersion")
if "clientInfo" not in params:
result.add_error("初始化請求必須包含 clientInfo")
else:
client_info = params["clientInfo"]
if not isinstance(client_info, dict):
result.add_error("clientInfo 必須是對象")
else:
if "name" not in client_info:
result.add_error("clientInfo 必須包含 name")
if "version" not in client_info:
result.add_error("clientInfo 必須包含 version")
elif "result" in message:
# 驗證初始化響應
result_data = message.get("result", {})
if "serverInfo" not in result_data:
result.add_warning("初始化響應建議包含 serverInfo")
if "capabilities" not in result_data:
result.add_warning("初始化響應建議包含 capabilities")
return result
@staticmethod
def validate_tools_list(message: Dict[str, Any]) -> ValidationResult:
"""驗證工具列表消息"""
result = ValidationResult(True, [], [], {})
# 先驗證 JSON-RPC 格式
json_rpc_result = MCPMessageValidator.validate_json_rpc(message)
result.errors.extend(json_rpc_result.errors)
result.warnings.extend(json_rpc_result.warnings)
if not json_rpc_result.valid:
result.valid = False
return result
# 驗證工具列表響應
if "result" in message:
result_data = message.get("result", {})
if "tools" not in result_data:
result.add_error("工具列表響應必須包含 tools 字段")
else:
tools = result_data["tools"]
if not isinstance(tools, list):
result.add_error("tools 字段必須是數組")
else:
for i, tool in enumerate(tools):
if not isinstance(tool, dict):
result.add_error(f"工具 {i} 必須是對象")
continue
if "name" not in tool:
result.add_error(f"工具 {i} 必須包含 name 字段")
if "description" not in tool:
result.add_warning(f"工具 {i} 建議包含 description 字段")
if "inputSchema" not in tool:
result.add_warning(f"工具 {i} 建議包含 inputSchema 字段")
result.add_detail("tools_count", len(tools))
return result
class TestResultValidator:
"""測試結果驗證器"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
def validate_test_result(self, test_result: Dict[str, Any]) -> ValidationResult:
"""驗證測試結果"""
result = ValidationResult(True, [], [], {})
# 檢查必需字段
required_fields = ["success", "steps", "performance", "errors"]
for field in required_fields:
if field not in test_result:
result.add_error(f"測試結果缺少必需字段: {field}")
# 驗證成功標誌
if "success" in test_result:
if not isinstance(test_result["success"], bool):
result.add_error("success 字段必須是布爾值")
# 驗證步驟信息
if "steps" in test_result:
steps = test_result["steps"]
if not isinstance(steps, dict):
result.add_error("steps 字段必須是對象")
else:
result.add_detail("completed_steps", list(steps.keys()))
result.add_detail("steps_count", len(steps))
# 驗證錯誤信息
if "errors" in test_result:
errors = test_result["errors"]
if not isinstance(errors, list):
result.add_error("errors 字段必須是數組")
else:
result.add_detail("error_count", len(errors))
if len(errors) > 0 and test_result.get("success", False):
result.add_warning("測試標記為成功但包含錯誤信息")
# 驗證性能數據
if "performance" in test_result:
performance = test_result["performance"]
if not isinstance(performance, dict):
result.add_error("performance 字段必須是對象")
else:
self._validate_performance_data(performance, result)
return result
def _validate_performance_data(self, performance: Dict[str, Any], result: ValidationResult):
"""驗證性能數據"""
# 檢查時間相關字段
time_fields = ["total_duration", "total_time"]
for field in time_fields:
if field in performance:
value = performance[field]
if not isinstance(value, (int, float)):
result.add_error(f"性能字段 {field} 必須是數字")
elif value < 0:
result.add_error(f"性能字段 {field} 不能為負數")
elif value > self.config.test_timeout:
result.add_warning(f"性能字段 {field} 超過測試超時時間")
# 檢查內存相關字段
memory_fields = ["memory_start", "memory_end", "memory_diff"]
for field in memory_fields:
if field in performance:
value = performance[field]
if not isinstance(value, dict):
result.add_warning(f"性能字段 {field} 應該是對象")
# 檢查檢查點數據
if "checkpoints" in performance:
checkpoints = performance["checkpoints"]
if not isinstance(checkpoints, list):
result.add_error("checkpoints 字段必須是數組")
else:
result.add_detail("checkpoints_count", len(checkpoints))
def validate_interactive_feedback_result(self, feedback_result: Dict[str, Any]) -> ValidationResult:
"""驗證互動回饋結果"""
result = ValidationResult(True, [], [], {})
# 檢查是否有錯誤
if "error" in feedback_result:
result.add_detail("has_error", True)
result.add_detail("error_message", feedback_result["error"])
return result
# 檢查預期字段
expected_fields = ["command_logs", "interactive_feedback", "images"]
for field in expected_fields:
if field not in feedback_result:
result.add_warning(f"互動回饋結果建議包含 {field} 字段")
# 驗證命令日誌
if "command_logs" in feedback_result:
logs = feedback_result["command_logs"]
if not isinstance(logs, str):
result.add_error("command_logs 字段必須是字符串")
# 驗證互動回饋
if "interactive_feedback" in feedback_result:
feedback = feedback_result["interactive_feedback"]
if not isinstance(feedback, str):
result.add_error("interactive_feedback 字段必須是字符串")
elif len(feedback.strip()) == 0:
result.add_warning("interactive_feedback 為空")
# 驗證圖片數據
if "images" in feedback_result:
images = feedback_result["images"]
if not isinstance(images, list):
result.add_error("images 字段必須是數組")
else:
result.add_detail("images_count", len(images))
for i, image in enumerate(images):
if not isinstance(image, dict):
result.add_error(f"圖片 {i} 必須是對象")
continue
if "data" not in image:
result.add_error(f"圖片 {i} 必須包含 data 字段")
if "media_type" not in image:
result.add_error(f"圖片 {i} 必須包含 media_type 字段")
return result
class TestValidators:
"""測試驗證器集合"""
def __init__(self, config: Optional[TestConfig] = None):
self.config = config or DEFAULT_CONFIG
self.message_validator = MCPMessageValidator()
self.result_validator = TestResultValidator(config)
def validate_all(self, test_data: Dict[str, Any]) -> Dict[str, ValidationResult]:
"""驗證所有測試數據"""
results = {}
# 驗證測試結果
if "test_result" in test_data:
results["test_result"] = self.result_validator.validate_test_result(
test_data["test_result"]
)
# 驗證 MCP 消息
if "mcp_messages" in test_data:
message_results = []
for i, message in enumerate(test_data["mcp_messages"]):
msg_result = self.message_validator.validate_json_rpc(message)
msg_result.add_detail("message_index", i)
message_results.append(msg_result)
results["mcp_messages"] = message_results
# 驗證互動回饋結果
if "feedback_result" in test_data:
results["feedback_result"] = self.result_validator.validate_interactive_feedback_result(
test_data["feedback_result"]
)
return results
def get_validation_summary(self, validation_results: Dict[str, ValidationResult]) -> Dict[str, Any]:
"""獲取驗證摘要"""
total_errors = 0
total_warnings = 0
valid_count = 0
total_count = 0
for key, result in validation_results.items():
if isinstance(result, list):
# 處理消息列表
for msg_result in result:
total_errors += len(msg_result.errors)
total_warnings += len(msg_result.warnings)
if msg_result.valid:
valid_count += 1
total_count += 1
else:
# 處理單個結果
total_errors += len(result.errors)
total_warnings += len(result.warnings)
if result.valid:
valid_count += 1
total_count += 1
return {
"total_validations": total_count,
"valid_count": valid_count,
"invalid_count": total_count - valid_count,
"total_errors": total_errors,
"total_warnings": total_warnings,
"success_rate": valid_count / total_count if total_count > 0 else 0
}

View File

@ -33,25 +33,35 @@ from ..i18n import get_i18n_manager
class WebUIManager: class WebUIManager:
"""Web UI 管理器""" """Web UI 管理器 - 重構為單一活躍會話模式"""
def __init__(self, host: str = "127.0.0.1", port: int = None): def __init__(self, host: str = "127.0.0.1", port: int = None):
self.host = host self.host = host
# 優先使用固定端口 8765確保 localStorage 的一致性 # 優先使用固定端口 8765確保 localStorage 的一致性
self.port = port or find_free_port(preferred_port=8765) self.port = port or find_free_port(preferred_port=8765)
self.app = FastAPI(title="MCP Feedback Enhanced") self.app = FastAPI(title="MCP Feedback Enhanced")
self.sessions: Dict[str, WebFeedbackSession] = {}
# 重構:使用單一活躍會話而非會話字典
self.current_session: Optional[WebFeedbackSession] = None
self.sessions: Dict[str, WebFeedbackSession] = {} # 保留用於向後兼容
# 全局標籤頁狀態管理 - 跨會話保持
self.global_active_tabs: Dict[str, dict] = {}
# 會話更新通知標記
self._pending_session_update = False
self.server_thread = None self.server_thread = None
self.server_process = None self.server_process = None
self.i18n = get_i18n_manager() self.i18n = get_i18n_manager()
# 設置靜態文件和模板 # 設置靜態文件和模板
self._setup_static_files() self._setup_static_files()
self._setup_templates() self._setup_templates()
# 設置路由 # 設置路由
setup_routes(self) setup_routes(self)
debug_log(f"WebUIManager 初始化完成,將在 {self.host}:{self.port} 啟動") debug_log(f"WebUIManager 初始化完成,將在 {self.host}:{self.port} 啟動")
def _setup_static_files(self): def _setup_static_files(self):
@ -73,25 +83,129 @@ class WebUIManager:
raise RuntimeError(f"Templates directory not found: {web_templates_path}") raise RuntimeError(f"Templates directory not found: {web_templates_path}")
def create_session(self, project_directory: str, summary: str) -> str: def create_session(self, project_directory: str, summary: str) -> str:
"""創建新的回饋會話""" """創建新的回饋會話 - 重構為單一活躍會話模式,保留標籤頁狀態"""
# 保存舊會話的 WebSocket 連接以便發送更新通知
old_websocket = None
if self.current_session and self.current_session.websocket:
old_websocket = self.current_session.websocket
debug_log("保存舊會話的 WebSocket 連接以發送更新通知")
# 如果已有活躍會話,先保存其標籤頁狀態到全局狀態
if self.current_session:
debug_log("保存現有會話的標籤頁狀態並清理會話")
# 保存標籤頁狀態到全局
if hasattr(self.current_session, 'active_tabs'):
self._merge_tabs_to_global(self.current_session.active_tabs)
# 同步清理會話資源(但保留 WebSocket 連接)
self.current_session._cleanup_sync()
session_id = str(uuid.uuid4()) session_id = str(uuid.uuid4())
session = WebFeedbackSession(session_id, project_directory, summary) session = WebFeedbackSession(session_id, project_directory, summary)
# 將全局標籤頁狀態繼承到新會話
session.active_tabs = self.global_active_tabs.copy()
# 設置為當前活躍會話
self.current_session = session
# 同時保存到字典中以保持向後兼容
self.sessions[session_id] = session self.sessions[session_id] = session
debug_log(f"創建回饋會話: {session_id}")
debug_log(f"創建新的活躍會話: {session_id}")
debug_log(f"繼承 {len(session.active_tabs)} 個活躍標籤頁")
# 如果有舊的 WebSocket 連接,立即發送會話更新通知
if old_websocket:
self._old_websocket_for_update = old_websocket
self._new_session_for_update = session
debug_log("已保存舊 WebSocket 連接,準備發送會話更新通知")
else:
# 標記需要發送會話更新通知(當新 WebSocket 連接建立時)
self._pending_session_update = True
return session_id return session_id
def get_session(self, session_id: str) -> Optional[WebFeedbackSession]: def get_session(self, session_id: str) -> Optional[WebFeedbackSession]:
"""獲取回饋會話""" """獲取回饋會話 - 保持向後兼容"""
return self.sessions.get(session_id) return self.sessions.get(session_id)
def get_current_session(self) -> Optional[WebFeedbackSession]:
"""獲取當前活躍會話"""
return self.current_session
def remove_session(self, session_id: str): def remove_session(self, session_id: str):
"""移除回饋會話""" """移除回饋會話"""
if session_id in self.sessions: if session_id in self.sessions:
session = self.sessions[session_id] session = self.sessions[session_id]
session.cleanup() session.cleanup()
del self.sessions[session_id] del self.sessions[session_id]
# 如果移除的是當前活躍會話,清空當前會話
if self.current_session and self.current_session.session_id == session_id:
self.current_session = None
debug_log("清空當前活躍會話")
debug_log(f"移除回饋會話: {session_id}") debug_log(f"移除回饋會話: {session_id}")
def clear_current_session(self):
"""清空當前活躍會話"""
if self.current_session:
session_id = self.current_session.session_id
self.current_session.cleanup()
self.current_session = None
# 同時從字典中移除
if session_id in self.sessions:
del self.sessions[session_id]
debug_log("已清空當前活躍會話")
def _merge_tabs_to_global(self, session_tabs: dict):
"""將會話的標籤頁狀態合併到全局狀態"""
current_time = time.time()
expired_threshold = 60 # 60秒過期閾值
# 清理過期的全局標籤頁
self.global_active_tabs = {
tab_id: tab_info
for tab_id, tab_info in self.global_active_tabs.items()
if current_time - tab_info.get('last_seen', 0) <= expired_threshold
}
# 合併會話標籤頁到全局
for tab_id, tab_info in session_tabs.items():
if current_time - tab_info.get('last_seen', 0) <= expired_threshold:
self.global_active_tabs[tab_id] = tab_info
debug_log(f"合併標籤頁狀態,全局活躍標籤頁數量: {len(self.global_active_tabs)}")
def get_global_active_tabs_count(self) -> int:
"""獲取全局活躍標籤頁數量"""
current_time = time.time()
expired_threshold = 60
# 清理過期標籤頁並返回數量
valid_tabs = {
tab_id: tab_info
for tab_id, tab_info in self.global_active_tabs.items()
if current_time - tab_info.get('last_seen', 0) <= expired_threshold
}
self.global_active_tabs = valid_tabs
return len(valid_tabs)
async def broadcast_to_active_tabs(self, message: dict):
"""向所有活躍標籤頁廣播消息"""
if not self.current_session or not self.current_session.websocket:
debug_log("沒有活躍的 WebSocket 連接,無法廣播消息")
return
try:
await self.current_session.websocket.send_json(message)
debug_log(f"已廣播消息到活躍標籤頁: {message.get('type', 'unknown')}")
except Exception as e:
debug_log(f"廣播消息失敗: {e}")
def start_server(self): def start_server(self):
"""啟動 Web 伺服器""" """啟動 Web 伺服器"""
def run_server_with_retry(): def run_server_with_retry():
@ -146,6 +260,126 @@ class WebUIManager:
except Exception as e: except Exception as e:
debug_log(f"無法開啟瀏覽器: {e}") debug_log(f"無法開啟瀏覽器: {e}")
async def smart_open_browser(self, url: str) -> bool:
"""智能開啟瀏覽器 - 檢測是否已有活躍標籤頁
Returns:
bool: True 表示檢測到活躍標籤頁False 表示開啟了新視窗
"""
import asyncio
import aiohttp
try:
# 檢查是否有活躍標籤頁
has_active_tabs = await self._check_active_tabs()
if has_active_tabs:
debug_log("檢測到活躍標籤頁,不開啟新瀏覽器視窗")
debug_log(f"用戶可以在現有標籤頁中查看更新:{url}")
return True
# 沒有活躍標籤頁,開啟新瀏覽器視窗
debug_log("沒有檢測到活躍標籤頁,開啟新瀏覽器視窗")
self.open_browser(url)
return False
except Exception as e:
debug_log(f"智能瀏覽器開啟失敗,回退到普通開啟:{e}")
self.open_browser(url)
return False
async def notify_session_update(self, session):
"""向活躍標籤頁發送會話更新通知"""
try:
# 向所有活躍的 WebSocket 連接發送會話更新通知
await self.broadcast_to_active_tabs({
"type": "session_updated",
"message": "新會話已創建,正在更新頁面內容",
"session_info": {
"project_directory": session.project_directory,
"summary": session.summary,
"session_id": session.session_id
}
})
debug_log("會話更新通知已發送到所有活躍標籤頁")
except Exception as e:
debug_log(f"發送會話更新通知失敗: {e}")
async def _send_immediate_session_update(self):
"""立即發送會話更新通知(使用舊的 WebSocket 連接)"""
try:
# 檢查是否有保存的舊 WebSocket 連接
if hasattr(self, '_old_websocket_for_update') and hasattr(self, '_new_session_for_update'):
old_websocket = self._old_websocket_for_update
new_session = self._new_session_for_update
# 發送會話更新通知
await old_websocket.send_json({
"type": "session_updated",
"message": "新會話已創建,正在更新頁面內容",
"session_info": {
"project_directory": new_session.project_directory,
"summary": new_session.summary,
"session_id": new_session.session_id
}
})
debug_log("已通過舊 WebSocket 連接發送會話更新通知")
# 清理臨時變數
delattr(self, '_old_websocket_for_update')
delattr(self, '_new_session_for_update')
# 延遲一小段時間讓前端處理消息,然後關閉舊連接
await asyncio.sleep(0.1)
try:
await old_websocket.close()
debug_log("已關閉舊 WebSocket 連接")
except Exception as e:
debug_log(f"關閉舊 WebSocket 連接失敗: {e}")
else:
# 沒有舊連接,設置待更新標記
self._pending_session_update = True
debug_log("沒有舊 WebSocket 連接,設置待更新標記")
except Exception as e:
debug_log(f"立即發送會話更新通知失敗: {e}")
# 回退到待更新標記
self._pending_session_update = True
async def _check_active_tabs(self) -> bool:
"""檢查是否有活躍標籤頁 - 優先檢查全局狀態,回退到 API"""
try:
# 首先檢查全局標籤頁狀態
global_count = self.get_global_active_tabs_count()
if global_count > 0:
debug_log(f"檢測到 {global_count} 個全局活躍標籤頁")
return True
# 如果全局狀態沒有活躍標籤頁,嘗試通過 API 檢查
# 等待一小段時間讓服務器完全啟動
await asyncio.sleep(0.5)
# 調用活躍標籤頁 API
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.get_server_url()}/api/active-tabs", timeout=2) as response:
if response.status == 200:
data = await response.json()
tab_count = data.get("count", 0)
debug_log(f"API 檢測到 {tab_count} 個活躍標籤頁")
return tab_count > 0
else:
debug_log(f"檢查活躍標籤頁失敗,狀態碼:{response.status}")
return False
except asyncio.TimeoutError:
debug_log("檢查活躍標籤頁超時")
return False
except Exception as e:
debug_log(f"檢查活躍標籤頁時發生錯誤:{e}")
return False
def get_server_url(self) -> str: def get_server_url(self) -> str:
"""獲取伺服器 URL""" """獲取伺服器 URL"""
return f"http://{self.host}:{self.port}" return f"http://{self.host}:{self.port}"
@ -176,52 +410,56 @@ def get_web_ui_manager() -> WebUIManager:
async def launch_web_feedback_ui(project_directory: str, summary: str, timeout: int = 600) -> dict: async def launch_web_feedback_ui(project_directory: str, summary: str, timeout: int = 600) -> dict:
""" """
啟動 Web 回饋介面並等待用戶回饋 啟動 Web 回饋介面並等待用戶回饋 - 重構為使用根路徑
Args: Args:
project_directory: 專案目錄路徑 project_directory: 專案目錄路徑
summary: AI 工作摘要 summary: AI 工作摘要
timeout: 超時時間 timeout: 超時時間
Returns: Returns:
dict: 回饋結果包含 logsinteractive_feedback images dict: 回饋結果包含 logsinteractive_feedback images
""" """
manager = get_web_ui_manager() manager = get_web_ui_manager()
# 創建會話 # 創建或更新當前活躍會話
session_id = manager.create_session(project_directory, summary) session_id = manager.create_session(project_directory, summary)
session = manager.get_session(session_id) session = manager.get_current_session()
if not session: if not session:
raise RuntimeError("無法創建回饋會話") raise RuntimeError("無法創建回饋會話")
# 啟動伺服器(如果尚未啟動) # 啟動伺服器(如果尚未啟動)
if not manager.server_thread or not manager.server_thread.is_alive(): if not manager.server_thread or not manager.server_thread.is_alive():
manager.start_server() manager.start_server()
# 構建完整 URL 並開啟瀏覽器 # 使用根路徑 URL 並智能開啟瀏覽器
feedback_url = f"{manager.get_server_url()}/session/{session_id}" feedback_url = manager.get_server_url() # 直接使用根路徑
manager.open_browser(feedback_url) has_active_tabs = await manager.smart_open_browser(feedback_url)
debug_log(f"[DEBUG] 服務器地址: {feedback_url}")
# 如果檢測到活躍標籤頁但沒有開啟新視窗,立即發送會話更新通知
if has_active_tabs:
await manager._send_immediate_session_update()
debug_log("已向活躍標籤頁發送會話更新通知")
try: try:
# 等待用戶回饋,傳遞 timeout 參數 # 等待用戶回饋,傳遞 timeout 參數
result = await session.wait_for_feedback(timeout) result = await session.wait_for_feedback(timeout)
debug_log(f"收到用戶回饋,會話: {session_id}") debug_log(f"收到用戶回饋")
return result return result
except TimeoutError: except TimeoutError:
debug_log(f"會話 {session_id} 超時") debug_log(f"會話超時")
# 資源已在 wait_for_feedback 中清理,這裡只需要記錄和重新拋出 # 資源已在 wait_for_feedback 中清理,這裡只需要記錄和重新拋出
raise raise
except Exception as e: except Exception as e:
debug_log(f"會話 {session_id} 發生錯誤: {e}") debug_log(f"會話發生錯誤: {e}")
raise raise
finally: finally:
# 清理會話(無論成功還是失敗) # 注意:不再自動清理會話和停止服務器,保持持久性
manager.remove_session(session_id) # 會話將保持活躍狀態,等待下次 MCP 調用
# 如果沒有其他活躍會話,停止服務器 debug_log("會話保持活躍狀態,等待下次 MCP 調用")
if len(manager.sessions) == 0:
debug_log("沒有活躍會話,停止 Web UI 服務器")
stop_web_ui()
def stop_web_ui(): def stop_web_ui():

View File

@ -11,6 +11,7 @@ import asyncio
import base64 import base64
import subprocess import subprocess
import threading import threading
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
@ -18,6 +19,16 @@ from fastapi import WebSocket
from ...debug import web_debug_log as debug_log from ...debug import web_debug_log as debug_log
class SessionStatus(Enum):
"""會話狀態枚舉"""
WAITING = "waiting" # 等待中
ACTIVE = "active" # 活躍中
FEEDBACK_SUBMITTED = "feedback_submitted" # 已提交反饋
COMPLETED = "completed" # 已完成
TIMEOUT = "timeout" # 超時
ERROR = "error" # 錯誤
# 常數定義 # 常數定義
MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB 圖片大小限制 MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB 圖片大小限制
SUPPORTED_IMAGE_TYPES = {'image/png', 'image/jpeg', 'image/jpg', 'image/gif', 'image/bmp', 'image/webp'} SUPPORTED_IMAGE_TYPES = {'image/png', 'image/jpeg', 'image/jpg', 'image/gif', 'image/bmp', 'image/webp'}
@ -39,10 +50,42 @@ class WebFeedbackSession:
self.process: Optional[subprocess.Popen] = None self.process: Optional[subprocess.Popen] = None
self.command_logs = [] self.command_logs = []
self._cleanup_done = False # 防止重複清理 self._cleanup_done = False # 防止重複清理
# 新增:會話狀態管理
self.status = SessionStatus.WAITING
self.status_message = "等待用戶回饋"
self.created_at = asyncio.get_event_loop().time()
self.last_activity = self.created_at
# 確保臨時目錄存在 # 確保臨時目錄存在
TEMP_DIR.mkdir(parents=True, exist_ok=True) TEMP_DIR.mkdir(parents=True, exist_ok=True)
def update_status(self, status: SessionStatus, message: str = None):
"""更新會話狀態"""
self.status = status
if message:
self.status_message = message
self.last_activity = asyncio.get_event_loop().time()
debug_log(f"會話 {self.session_id} 狀態更新: {status.value} - {self.status_message}")
def get_status_info(self) -> dict:
"""獲取會話狀態信息"""
return {
"status": self.status.value,
"message": self.status_message,
"feedback_completed": self.feedback_completed.is_set(),
"has_websocket": self.websocket is not None,
"created_at": self.created_at,
"last_activity": self.last_activity,
"project_directory": self.project_directory,
"summary": self.summary,
"session_id": self.session_id
}
def is_active(self) -> bool:
"""檢查會話是否活躍"""
return self.status in [SessionStatus.WAITING, SessionStatus.ACTIVE, SessionStatus.FEEDBACK_SUBMITTED]
async def wait_for_feedback(self, timeout: int = 600) -> dict: async def wait_for_feedback(self, timeout: int = 600) -> dict:
""" """
等待用戶回饋包含圖片支援超時自動清理 等待用戶回饋包含圖片支援超時自動清理
@ -102,13 +145,24 @@ class WebFeedbackSession:
# 先設置設定,再處理圖片(因為處理圖片時需要用到設定) # 先設置設定,再處理圖片(因為處理圖片時需要用到設定)
self.settings = settings or {} self.settings = settings or {}
self.images = self._process_images(images) self.images = self._process_images(images)
# 更新狀態為已提交反饋
self.update_status(SessionStatus.FEEDBACK_SUBMITTED, "已送出反饋,等待下次 MCP 調用")
self.feedback_completed.set() self.feedback_completed.set()
# 發送反饋已收到的消息給前端
if self.websocket: if self.websocket:
try: try:
await self.websocket.close() await self.websocket.send_json({
except: "type": "feedback_received",
pass "message": "反饋已成功提交",
"status": self.status.value
})
except Exception as e:
debug_log(f"發送反饋確認失敗: {e}")
# 重構:不再自動關閉 WebSocket保持連接以支援頁面持久性
def _process_images(self, images: List[dict]) -> List[dict]: def _process_images(self, images: List[dict]) -> List[dict]:
""" """
@ -304,14 +358,14 @@ class WebFeedbackSession:
except Exception as e: except Exception as e:
debug_log(f"清理會話 {self.session_id} 資源時發生錯誤: {e}") debug_log(f"清理會話 {self.session_id} 資源時發生錯誤: {e}")
def cleanup(self): def _cleanup_sync(self):
"""同步清理會話資源(保持向後兼容""" """同步清理會話資源(但保留 WebSocket 連接"""
if self._cleanup_done: if self._cleanup_done:
return return
self._cleanup_done = True debug_log(f"同步清理會話 {self.session_id} 資源(保留 WebSocket...")
debug_log(f"同步清理會話 {self.session_id} 資源...")
# 只清理進程,不清理 WebSocket 連接
if self.process: if self.process:
try: try:
self.process.terminate() self.process.terminate()
@ -321,7 +375,30 @@ class WebFeedbackSession:
self.process.kill() self.process.kill()
except: except:
pass pass
self.process = None self.process = None
# 清理臨時數據
self.command_logs.clear()
# 注意:不設置 _cleanup_done = True因為還需要清理 WebSocket
def cleanup(self):
"""同步清理會話資源(保持向後兼容)"""
if self._cleanup_done:
return
self._cleanup_done = True
debug_log(f"同步清理會話 {self.session_id} 資源...")
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
try:
self.process.kill()
except:
pass
self.process = None
# 設置完成事件 # 設置完成事件
self.feedback_completed.set() self.feedback_completed.set()

View File

@ -9,6 +9,7 @@
import json import json
import os import os
import time
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -24,32 +25,30 @@ if TYPE_CHECKING:
def setup_routes(manager: 'WebUIManager'): def setup_routes(manager: 'WebUIManager'):
"""設置路由""" """設置路由"""
@manager.app.get("/", response_class=HTMLResponse) @manager.app.get("/", response_class=HTMLResponse)
async def index(request: Request): async def index(request: Request):
"""首頁""" """統一回饋頁面 - 重構後的主頁面"""
return manager.templates.TemplateResponse("index.html", { # 獲取當前活躍會話
"request": request, current_session = manager.get_current_session()
"title": "MCP Feedback Enhanced"
})
@manager.app.get("/session/{session_id}", response_class=HTMLResponse) if not current_session:
async def feedback_session(request: Request, session_id: str): # 沒有活躍會話時顯示等待頁面
"""回饋會話頁面""" return manager.templates.TemplateResponse("index.html", {
session = manager.get_session(session_id) "request": request,
if not session: "title": "MCP Feedback Enhanced",
return JSONResponse( "has_session": False,
status_code=404, "version": __version__
content={"error": "會話不存在"} })
)
# 有活躍會話時顯示回饋頁面
return manager.templates.TemplateResponse("feedback.html", { return manager.templates.TemplateResponse("feedback.html", {
"request": request, "request": request,
"session_id": session_id, "project_directory": current_session.project_directory,
"project_directory": session.project_directory, "summary": current_session.summary,
"summary": session.summary,
"title": "Interactive Feedback - 回饋收集", "title": "Interactive Feedback - 回饋收集",
"version": __version__ "version": __version__,
"has_session": True
}) })
@manager.app.get("/api/translations") @manager.app.get("/api/translations")
@ -81,27 +80,93 @@ def setup_routes(manager: 'WebUIManager'):
debug_log(f"Web 翻譯 API 返回 {len(translations)} 種語言的數據") debug_log(f"Web 翻譯 API 返回 {len(translations)} 種語言的數據")
return JSONResponse(content=translations) return JSONResponse(content=translations)
@manager.app.websocket("/ws/{session_id}") @manager.app.get("/api/session-status")
async def websocket_endpoint(websocket: WebSocket, session_id: str): async def get_session_status():
"""WebSocket 端點""" """獲取當前會話狀態"""
session = manager.get_session(session_id) current_session = manager.get_current_session()
if not current_session:
return JSONResponse(content={
"has_session": False,
"status": "no_session",
"message": "沒有活躍會話"
})
return JSONResponse(content={
"has_session": True,
"status": "active",
"session_info": {
"project_directory": current_session.project_directory,
"summary": current_session.summary,
"feedback_completed": current_session.feedback_completed.is_set()
}
})
@manager.app.get("/api/current-session")
async def get_current_session():
"""獲取當前會話詳細信息"""
current_session = manager.get_current_session()
if not current_session:
return JSONResponse(
status_code=404,
content={"error": "沒有活躍會話"}
)
return JSONResponse(content={
"project_directory": current_session.project_directory,
"summary": current_session.summary,
"feedback_completed": current_session.feedback_completed.is_set(),
"command_logs": current_session.command_logs,
"images_count": len(current_session.images)
})
@manager.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket 端點 - 重構後移除 session_id 依賴"""
# 獲取當前活躍會話
session = manager.get_current_session()
if not session: if not session:
await websocket.close(code=4004, reason="會話不存在") await websocket.close(code=4004, reason="沒有活躍會話")
return return
await websocket.accept() await websocket.accept()
session.websocket = websocket session.websocket = websocket
debug_log(f"WebSocket 連接建立: {session_id}") debug_log(f"WebSocket 連接建立: 當前活躍會話")
# 發送連接成功消息
try:
await websocket.send_json({
"type": "connection_established",
"message": "WebSocket 連接已建立"
})
# 檢查是否有待發送的會話更新
if getattr(manager, '_pending_session_update', False):
await websocket.send_json({
"type": "session_updated",
"message": "新會話已創建,正在更新頁面內容",
"session_info": {
"project_directory": session.project_directory,
"summary": session.summary,
"session_id": session.session_id
}
})
manager._pending_session_update = False
debug_log("已發送會話更新通知到前端")
except Exception as e:
debug_log(f"發送連接確認失敗: {e}")
try: try:
while True: while True:
data = await websocket.receive_text() data = await websocket.receive_text()
message = json.loads(data) message = json.loads(data)
await handle_websocket_message(manager, session, message) await handle_websocket_message(manager, session, message)
except WebSocketDisconnect: except WebSocketDisconnect:
debug_log(f"WebSocket 連接斷開: {session_id}") debug_log(f"WebSocket 連接斷開")
except Exception as e: except Exception as e:
debug_log(f"WebSocket 錯誤: {e}") debug_log(f"WebSocket 錯誤: {e}")
finally: finally:
@ -181,35 +246,154 @@ def setup_routes(manager: 'WebUIManager'):
content={"status": "error", "message": f"清除失敗: {str(e)}"} content={"status": "error", "message": f"清除失敗: {str(e)}"}
) )
@manager.app.get("/api/active-tabs")
async def get_active_tabs():
"""獲取活躍標籤頁信息 - 優先使用全局狀態"""
current_time = time.time()
expired_threshold = 60
# 清理過期的全局標籤頁
valid_global_tabs = {}
for tab_id, tab_info in manager.global_active_tabs.items():
if current_time - tab_info.get('last_seen', 0) <= expired_threshold:
valid_global_tabs[tab_id] = tab_info
manager.global_active_tabs = valid_global_tabs
# 如果有當前會話,也更新會話的標籤頁狀態
current_session = manager.get_current_session()
if current_session:
# 合併會話標籤頁到全局(如果有的話)
session_tabs = getattr(current_session, 'active_tabs', {})
for tab_id, tab_info in session_tabs.items():
if current_time - tab_info.get('last_seen', 0) <= expired_threshold:
valid_global_tabs[tab_id] = tab_info
# 更新會話的活躍標籤頁
current_session.active_tabs = valid_global_tabs.copy()
manager.global_active_tabs = valid_global_tabs
return JSONResponse(content={
"has_session": current_session is not None,
"active_tabs": valid_global_tabs,
"count": len(valid_global_tabs)
})
@manager.app.post("/api/register-tab")
async def register_tab(request: Request):
"""註冊新標籤頁"""
try:
data = await request.json()
tab_id = data.get("tabId")
if not tab_id:
return JSONResponse(
status_code=400,
content={"error": "缺少 tabId"}
)
current_session = manager.get_current_session()
if not current_session:
return JSONResponse(
status_code=404,
content={"error": "沒有活躍會話"}
)
# 註冊標籤頁
tab_info = {
'timestamp': time.time() * 1000, # 毫秒時間戳
'last_seen': time.time(),
'registered_at': time.time()
}
if not hasattr(current_session, 'active_tabs'):
current_session.active_tabs = {}
current_session.active_tabs[tab_id] = tab_info
# 同時更新全局標籤頁狀態
manager.global_active_tabs[tab_id] = tab_info
debug_log(f"標籤頁已註冊: {tab_id}")
return JSONResponse(content={
"status": "success",
"tabId": tab_id,
"registered": True
})
except Exception as e:
debug_log(f"註冊標籤頁失敗: {e}")
return JSONResponse(
status_code=500,
content={"error": f"註冊失敗: {str(e)}"}
)
async def handle_websocket_message(manager: 'WebUIManager', session, data: dict): async def handle_websocket_message(manager: 'WebUIManager', session, data: dict):
"""處理 WebSocket 消息""" """處理 WebSocket 消息"""
message_type = data.get("type") message_type = data.get("type")
if message_type == "submit_feedback": if message_type == "submit_feedback":
# 提交回饋 # 提交回饋
feedback = data.get("feedback", "") feedback = data.get("feedback", "")
images = data.get("images", []) images = data.get("images", [])
settings = data.get("settings", {}) settings = data.get("settings", {})
await session.submit_feedback(feedback, images, settings) await session.submit_feedback(feedback, images, settings)
elif message_type == "run_command": elif message_type == "run_command":
# 執行命令 # 執行命令
command = data.get("command", "") command = data.get("command", "")
if command.strip(): if command.strip():
await session.run_command(command) await session.run_command(command)
elif message_type == "get_status":
# 獲取會話狀態
if session.websocket:
try:
await session.websocket.send_json({
"type": "status_update",
"status_info": session.get_status_info()
})
except Exception as e:
debug_log(f"發送狀態更新失敗: {e}")
elif message_type == "heartbeat":
# WebSocket 心跳處理
tab_id = data.get("tabId", "unknown")
timestamp = data.get("timestamp", 0)
tab_info = {
'timestamp': timestamp,
'last_seen': time.time()
}
# 更新會話的標籤頁信息
if hasattr(session, 'active_tabs'):
session.active_tabs[tab_id] = tab_info
else:
session.active_tabs = {tab_id: tab_info}
# 同時更新全局標籤頁狀態
manager.global_active_tabs[tab_id] = tab_info
# 發送心跳回應
if session.websocket:
try:
await session.websocket.send_json({
"type": "heartbeat_response",
"tabId": tab_id,
"timestamp": timestamp
})
except Exception as e:
debug_log(f"發送心跳回應失敗: {e}")
elif message_type == "user_timeout": elif message_type == "user_timeout":
# 用戶設置的超時已到 # 用戶設置的超時已到
debug_log(f"收到用戶超時通知: {session.session_id}") debug_log(f"收到用戶超時通知: {session.session_id}")
# 清理會話資源 # 清理會話資源
await session._cleanup_resources_on_timeout() await session._cleanup_resources_on_timeout()
# 如果沒有其他活躍會話,停止服務器 # 重構:不再自動停止服務器,保持服務器運行以支援持久性
if len(manager.sessions) <= 1: # 當前會話即將被移除
debug_log("用戶超時,沒有其他活躍會話,準備停止服務器")
# 延遲停止服務器,給前端時間關閉
import asyncio
asyncio.create_task(_delayed_server_stop(manager))
else: else:
debug_log(f"未知的消息類型: {message_type}") debug_log(f"未知的消息類型: {message_type}")

File diff suppressed because it is too large Load Diff

View File

@ -828,6 +828,33 @@
background: rgba(0, 122, 204, 0.15); background: rgba(0, 122, 204, 0.15);
} }
/* 佈局模式樣式 */
/* 預設分離模式 - 顯示回饋和AI摘要頁籤隱藏合併模式頁籤 */
body.layout-separate .tab-button[data-tab="combined"] {
display: none;
}
body.layout-separate .tab-button[data-tab="feedback"],
body.layout-separate .tab-button[data-tab="summary"] {
display: inline-block;
}
/* 合併模式 - 顯示合併模式頁籤隱藏回饋和AI摘要頁籤 */
body.layout-combined-vertical .tab-button[data-tab="combined"],
body.layout-combined-horizontal .tab-button[data-tab="combined"] {
display: inline-block;
}
body.layout-combined-vertical .tab-button[data-tab="feedback"],
body.layout-combined-vertical .tab-button[data-tab="summary"],
body.layout-combined-horizontal .tab-button[data-tab="feedback"],
body.layout-combined-horizontal .tab-button[data-tab="summary"] {
display: none;
}
/* 合併模式分頁的水平佈局樣式 */ /* 合併模式分頁的水平佈局樣式 */
#tab-combined.active.combined-horizontal .combined-content { #tab-combined.active.combined-horizontal .combined-content {
display: flex !important; display: flex !important;
@ -1025,6 +1052,74 @@
.compatibility-hint-btn:hover { .compatibility-hint-btn:hover {
background: #1976d2; background: #1976d2;
} }
/* 回饋狀態指示器樣式 */
.feedback-status-indicator {
display: flex;
align-items: center;
padding: 12px 16px;
margin: 16px 0;
border-radius: 8px;
border: 1px solid;
background: var(--card-bg);
transition: all 0.3s ease;
}
.feedback-status-indicator .status-icon {
font-size: 24px;
margin-right: 12px;
min-width: 32px;
text-align: center;
}
.feedback-status-indicator .status-text {
flex: 1;
}
.feedback-status-indicator .status-text strong {
display: block;
font-size: 16px;
margin-bottom: 4px;
}
.feedback-status-indicator .status-text span {
font-size: 14px;
opacity: 0.8;
}
.feedback-status-indicator.status-waiting {
border-color: var(--accent-color);
background: rgba(74, 144, 226, 0.1);
}
.feedback-status-indicator.status-processing {
border-color: #ffa500;
background: rgba(255, 165, 0, 0.1);
animation: pulse 2s infinite;
}
.feedback-status-indicator.status-submitted {
border-color: var(--success-color);
background: rgba(40, 167, 69, 0.1);
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.7; }
100% { opacity: 1; }
}
/* 禁用狀態的樣式 */
.image-upload-area.disabled {
opacity: 0.5;
pointer-events: none;
cursor: not-allowed;
}
.text-input:disabled {
opacity: 0.6;
cursor: not-allowed;
}
</style> </style>
</head> </head>
<body> <body>
@ -1073,6 +1168,8 @@
</div> </div>
</div> </div>
<!-- 回饋分頁 --> <!-- 回饋分頁 -->
<div id="tab-feedback" class="tab-content active"> <div id="tab-feedback" class="tab-content active">
<div class="section-description" data-i18n="feedback.description"> <div class="section-description" data-i18n="feedback.description">
@ -1209,14 +1306,27 @@
<!-- 回饋輸入區域 --> <!-- 回饋輸入區域 -->
<div class="combined-section"> <div class="combined-section">
<h3 class="combined-section-title" data-i18n="combined.feedbackTitle">💬 提供回饋</h3> <h3 class="combined-section-title" data-i18n="combined.feedbackTitle">💬 提供回饋</h3>
<!-- 等待回饋狀態指示器 -->
<div id="combinedFeedbackStatusIndicator" class="feedback-status-indicator status-waiting" style="display: none;">
<div class="status-icon"></div>
<div class="status-text">
<strong data-i18n="feedback.status.waiting.title">等待您的回饋</strong>
<span data-i18n="feedback.status.waiting.message">請提供您對 AI 工作成果的意見和建議</span>
</div>
</div>
<div class="input-group"> <div class="input-group">
<label class="input-label" data-i18n="feedback.textLabel">文字回饋</label> <label class="input-label" data-i18n="feedback.textLabel">文字回饋</label>
<textarea <textarea
id="combinedFeedbackText" id="combinedFeedbackText"
class="text-input" class="text-input"
data-i18n-placeholder="feedback.detailedPlaceholder" data-i18n-placeholder="feedback.detailedPlaceholder"
placeholder="請在這裡輸入您的回饋..." placeholder="請在這裡輸入您的回饋...
💡 小提示:
• 按 Ctrl+Enter/Cmd+Enter (支援數字鍵盤) 可快速提交
• 按 Ctrl+V/Cmd+V 可直接貼上剪貼板圖片"
style="min-height: 150px;" style="min-height: 150px;"
></textarea> ></textarea>
</div> </div>
@ -1510,8 +1620,8 @@
</div> </div>
<!-- WebSocket 和 JavaScript --> <!-- WebSocket 和 JavaScript -->
<script src="/static/js/i18n.js"></script> <script src="/static/js/i18n.js?v=2025010510"></script>
<script src="/static/js/app.js"></script> <script src="/static/js/app.js?v=2025010510"></script>
<script> <script>
// 等待 I18nManager 初始化完成後再初始化 FeedbackApp // 等待 I18nManager 初始化完成後再初始化 FeedbackApp
async function initializeApp() { async function initializeApp() {

View File

@ -1,16 +1,26 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="zh-TW"> <html lang="zh-TW" id="html-root">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{ title }}</title> <title>{{ title }}</title>
<link rel="stylesheet" href="/static/css/styles.css">
<style> <style>
:root { :root {
/* 深色主題顏色變數 */
--bg-primary: #1e1e1e; --bg-primary: #1e1e1e;
--bg-secondary: #2d2d30; --bg-secondary: #2d2d30;
--bg-tertiary: #252526;
--surface-color: #333333;
--text-primary: #cccccc; --text-primary: #cccccc;
--text-secondary: #9e9e9e; --text-secondary: #9e9e9e;
--accent-color: #007acc; --accent-color: #007acc;
--accent-hover: #005a9e;
--border-color: #464647;
--success-color: #4caf50;
--warning-color: #ff9800;
--error-color: #f44336;
--info-color: #2196f3;
} }
* { * {
@ -26,12 +36,19 @@
line-height: 1.6; line-height: 1.6;
min-height: 100vh; min-height: 100vh;
display: flex; display: flex;
align-items: center; flex-direction: column;
justify-content: center;
} }
.container { /* 等待會話時的樣式 */
.waiting-container {
display: flex;
align-items: center;
justify-content: center;
min-height: 100vh;
text-align: center; text-align: center;
}
.waiting-content {
max-width: 600px; max-width: 600px;
padding: 40px; padding: 40px;
background: var(--bg-secondary); background: var(--bg-secondary);
@ -39,37 +56,267 @@
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3); box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3);
} }
.title { .waiting-title {
font-size: 2.5em; font-size: 2.5em;
font-weight: bold; font-weight: bold;
color: var(--accent-color); color: var(--accent-color);
margin-bottom: 20px; margin-bottom: 20px;
} }
.description { .waiting-description {
color: var(--text-secondary); color: var(--text-secondary);
font-size: 1.2em; font-size: 1.2em;
margin-bottom: 30px; margin-bottom: 30px;
} }
.status { .waiting-status {
padding: 20px; padding: 20px;
background: rgba(0, 122, 204, 0.1); background: rgba(0, 122, 204, 0.1);
border: 1px solid var(--accent-color); border: 1px solid var(--accent-color);
border-radius: 8px; border-radius: 8px;
color: var(--accent-color); color: var(--accent-color);
} }
/* 連接狀態指示器 */
.connection-status {
position: fixed;
top: 20px;
right: 20px;
z-index: 1000;
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
background: var(--bg-secondary);
border: 1px solid var(--border-color);
border-radius: 6px;
font-size: 12px;
font-weight: 500;
}
.connection-indicator {
display: inline-block;
width: 8px;
height: 8px;
border-radius: 50%;
background: var(--error-color);
transition: all 0.3s ease;
}
.connection-indicator.connected {
background: var(--success-color);
box-shadow: 0 0 8px rgba(76, 175, 80, 0.5);
}
.connection-indicator.connecting {
background: var(--warning-color);
animation: pulse 1s infinite;
}
.connection-indicator.error {
background: var(--error-color);
}
.connection-indicator.disconnected {
background: var(--text-secondary);
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
/* 主容器 - 有會話時顯示 */
.main-container {
display: none;
max-width: 1200px;
width: 100%;
margin: 0 auto;
padding: 20px;
background: var(--bg-primary);
color: var(--text-primary);
min-height: 100vh;
flex-direction: column;
}
.main-container.active {
display: flex;
}
/* 回饋界面樣式 */
.feedback-container {
width: 100%;
max-width: 1000px;
margin: 0 auto;
}
.header {
background: var(--bg-secondary);
border-bottom: 1px solid var(--border-color);
padding: 15px 0;
margin-bottom: 20px;
border-radius: 8px 8px 0 0;
}
.header-content {
display: flex;
justify-content: space-between;
align-items: center;
padding: 0 20px;
}
.title {
font-size: 24px;
font-weight: bold;
color: var(--accent-color);
margin: 0;
}
.project-info {
color: var(--text-secondary);
font-size: 14px;
}
.ai-summary-section,
.feedback-section,
.command-section {
background: var(--bg-secondary);
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 20px;
margin-bottom: 20px;
}
.ai-summary-section h2,
.feedback-section h3,
.command-section h3 {
color: var(--accent-color);
margin-bottom: 16px;
margin-top: 0;
}
.input-group {
margin-bottom: 16px;
}
.input-label {
display: block;
font-weight: 500;
margin-bottom: 8px;
color: var(--text-primary);
}
.text-input,
.command-input-line {
width: 100%;
background: var(--bg-primary);
border: 1px solid var(--border-color);
border-radius: 6px;
padding: 12px;
color: var(--text-primary);
font-size: 14px;
line-height: 1.5;
resize: vertical;
font-family: inherit;
transition: border-color 0.3s ease;
box-sizing: border-box;
}
.text-input:focus,
.command-input-line:focus {
outline: none;
border-color: var(--accent-color);
}
.command-input-line {
font-family: 'Consolas', 'Monaco', 'Courier New', monospace;
}
.command-output {
background: #0f0f0f;
border: 2px solid var(--border-color);
border-radius: 6px;
padding: 12px;
font-family: 'Consolas', 'Monaco', 'Courier New', monospace;
font-size: 14px;
line-height: 1.4;
color: #00ff00;
text-shadow: 0 0 5px #00ff00;
white-space: pre-wrap;
overflow-y: auto;
width: 100%;
box-sizing: border-box;
}
.btn {
padding: 10px 20px;
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 14px;
font-weight: 500;
transition: all 0.3s ease;
text-decoration: none;
display: inline-flex;
align-items: center;
justify-content: center;
gap: 8px;
margin-right: 10px;
}
.btn-primary {
background: var(--accent-color);
color: white;
}
.btn-primary:hover {
background: var(--accent-hover);
}
.btn-secondary {
background: var(--surface-color);
color: var(--text-primary);
border: 1px solid var(--border-color);
}
.btn-secondary:hover {
background: var(--bg-tertiary);
}
.button-group {
display: flex;
gap: 10px;
margin-top: 16px;
}
</style> </style>
</head> </head>
<body> <body>
<div class="container"> <!-- 連接狀態指示器 -->
<h1 class="title">MCP Feedback Enhanced</h1> <div class="connection-status">
<p class="description"> <div class="connection-indicator" id="connectionIndicator"></div>
Web UI 互動式回饋收集工具 <span id="connectionText">連接中...</span>
</p> </div>
<div class="status">
Web UI 服務已啟動。請等待會話建立或直接訪問具體的會話 URL。 <!-- 等待會話的頁面 -->
<div class="waiting-container" id="waitingContainer">
<div class="waiting-content">
<h1 class="waiting-title">MCP Feedback Enhanced</h1>
<p class="waiting-description">
Web UI 互動式回饋收集工具
</p>
<div class="waiting-status">
等待 MCP 服務調用以建立回饋會話...
</div>
</div> </div>
</div> </div>
<!-- 主要回饋界面 -->
<div class="main-container" id="mainContainer">
<!-- 這裡將動態載入回饋界面內容 -->
</div>
<!-- JavaScript -->
<script src="/static/js/i18n.js?v=2025010505"></script>
<script src="/static/js/app.js?v=2025010505"></script>
</body> </body>
</html> </html>