更新測試用例,新增對 timeout 和 force_web_ui 參數的測試,並改善環境檢測功能的輸出信息。重構 Web UI 以支援圖片上傳和回饋提交,提升用戶體驗。

This commit is contained in:
Minidoracat 2025-05-31 02:02:51 +08:00
parent 918428dd45
commit 4bce2c30f2
3 changed files with 1265 additions and 446 deletions

File diff suppressed because it is too large Load Diff

View File

@ -119,16 +119,16 @@ def test_environment_detection():
print("-" * 30)
try:
from server import is_ssh_session, can_use_gui
from server import is_remote_environment, can_use_gui
ssh_detected = is_ssh_session()
remote_detected = is_remote_environment()
gui_available = can_use_gui()
print(f"SSH 環境檢測: {'' if ssh_detected else ''}")
print(f"遠端環境檢測: {'' if remote_detected else ''}")
print(f"GUI 可用性: {'' if gui_available else ''}")
if ssh_detected:
print("✅ 將使用 Web UI (適合 SSH remote 開發)")
if remote_detected:
print("✅ 將使用 Web UI (適合遠端開發環境)")
else:
print("✅ 將使用 Qt GUI (本地環境)")
@ -147,6 +147,12 @@ def test_mcp_integration():
from server import interactive_feedback
print("✅ MCP 工具函數可用")
# Test timeout parameter
print("✅ 支援 timeout 參數")
# Test force_web_ui parameter
print("✅ 支援 force_web_ui 參數")
# Test would require actual MCP call, so just verify import
print("✅ 準備接受來自 AI 助手的調用")
return True
@ -155,6 +161,67 @@ def test_mcp_integration():
print(f"❌ MCP 整合測試失敗: {e}")
return False
def test_new_parameters():
"""Test new timeout and force_web_ui parameters"""
print("\n🆕 測試新增參數功能")
print("-" * 30)
try:
from server import interactive_feedback
# 測試參數是否存在
import inspect
sig = inspect.signature(interactive_feedback)
# 檢查 timeout 參數
if 'timeout' in sig.parameters:
timeout_param = sig.parameters['timeout']
print(f"✅ timeout 參數存在,預設值: {timeout_param.default}")
else:
print("❌ timeout 參數不存在")
return False
# 檢查 force_web_ui 參數
if 'force_web_ui' in sig.parameters:
force_web_ui_param = sig.parameters['force_web_ui']
print(f"✅ force_web_ui 參數存在,預設值: {force_web_ui_param.default}")
else:
print("❌ force_web_ui 參數不存在")
return False
print("✅ 所有新參數功能正常")
return True
except Exception as e:
print(f"❌ 新參數測試失敗: {e}")
return False
def test_force_web_ui_mode():
"""Test force web UI mode"""
print("\n🌐 測試強制 Web UI 模式")
print("-" * 30)
try:
from server import interactive_feedback, is_remote_environment, can_use_gui
# 顯示當前環境狀態
is_remote = is_remote_environment()
gui_available = can_use_gui()
print(f"當前環境 - 遠端: {is_remote}, GUI 可用: {gui_available}")
if not is_remote and gui_available:
print("✅ 在本地 GUI 環境中可以使用 force_web_ui=True 強制使用 Web UI")
print("💡 這對於測試 Web UI 功能非常有用")
else:
print(" 當前環境會自動使用 Web UI")
return True
except Exception as e:
print(f"❌ 強制 Web UI 測試失敗: {e}")
return False
def interactive_demo(session_info):
"""Run interactive demo with the Web UI"""
print(f"\n🌐 Web UI 持久化運行模式")
@ -204,6 +271,12 @@ if __name__ == "__main__":
# Test environment detection
env_test = test_environment_detection()
# Test new parameters
params_test = test_new_parameters()
# Test force web UI mode
force_test = test_force_web_ui_mode()
# Test MCP integration
mcp_test = test_mcp_integration()
@ -211,7 +284,7 @@ if __name__ == "__main__":
web_test, session_info = test_web_ui()
print("\n" + "=" * 60)
if env_test and mcp_test and web_test:
if env_test and params_test and force_test and mcp_test and web_test:
print("🎊 所有測試完成!準備使用 Interactive Feedback MCP")
print("\n📖 使用方法:")
print(" 1. 在 Cursor/Cline 中配置此 MCP 服務器")

692
web_ui.py
View File

@ -1,6 +1,17 @@
# Interactive Feedback MCP Web UI
# Developed by Fábio Ferreira (https://x.com/fabiomlferreira)
# Web UI version for SSH remote development
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
互動式回饋收集 Web UI
=====================
基於 FastAPI Web 用戶介面專為 SSH 遠端開發環境設計
支援文字輸入圖片上傳命令執行等功能
作者: Fábio Ferreira
靈感來源: dotcursorrules.com
增強功能: 圖片支援和現代化界面設計
"""
import os
import sys
import json
@ -11,285 +22,327 @@ import threading
import subprocess
import psutil
import time
import base64
import tempfile
from typing import Dict, Optional, List
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, UploadFile, File, Form
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
import uvicorn
# ===== 常數定義 =====
MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB 圖片大小限制
SUPPORTED_IMAGE_TYPES = {'image/png', 'image/jpeg', 'image/jpg', 'image/gif', 'image/bmp', 'image/webp'}
TEMP_DIR = Path.home() / ".cache" / "interactive-feedback-mcp-web"
# ===== Web 回饋會話類 =====
class WebFeedbackSession:
"""Web 回饋會話管理"""
def __init__(self, session_id: str, project_directory: str, summary: str):
self.session_id = session_id
self.project_directory = project_directory
self.summary = summary
self.websocket: Optional[WebSocket] = None
self.feedback_result: Optional[str] = None
self.command_logs: List[str] = []
self.images: List[dict] = []
self.feedback_completed = threading.Event()
self.process: Optional[subprocess.Popen] = None
self.completed = False
self.config = {
"run_command": "",
"execute_automatically": False
}
self.command_logs = []
# 確保臨時目錄存在
TEMP_DIR.mkdir(parents=True, exist_ok=True)
async def wait_for_feedback(self, timeout: int = 600) -> dict:
"""
等待用戶回饋包含圖片
Args:
timeout: 超時時間
Returns:
dict: 回饋結果
"""
loop = asyncio.get_event_loop()
def wait_in_thread():
return self.feedback_completed.wait(timeout)
completed = await loop.run_in_executor(None, wait_in_thread)
if completed:
return {
"logs": "\n".join(self.command_logs),
"interactive_feedback": self.feedback_result or "",
"images": self.images
}
else:
raise TimeoutError("等待用戶回饋超時")
async def submit_feedback(self, feedback: str, images: List[dict]):
"""
提交回饋和圖片
Args:
feedback: 文字回饋
images: 圖片列表
"""
self.feedback_result = feedback
self.images = self._process_images(images)
self.feedback_completed.set()
if self.websocket:
try:
await self.websocket.close()
except:
pass
def _process_images(self, images: List[dict]) -> List[dict]:
"""
處理圖片數據轉換為統一格式
Args:
images: 原始圖片數據列表
Returns:
List[dict]: 處理後的圖片數據
"""
processed_images = []
for img in images:
try:
if not all(key in img for key in ["name", "data", "size"]):
continue
# 檢查文件大小
if img["size"] > MAX_IMAGE_SIZE:
print(f"[DEBUG] 圖片 {img['name']} 超過大小限制,跳過")
continue
# 解碼 base64 數據
if isinstance(img["data"], str):
try:
image_bytes = base64.b64decode(img["data"])
except Exception as e:
print(f"[DEBUG] 圖片 {img['name']} base64 解碼失敗: {e}")
continue
else:
image_bytes = img["data"]
if len(image_bytes) == 0:
print(f"[DEBUG] 圖片 {img['name']} 數據為空,跳過")
continue
processed_images.append({
"name": img["name"],
"data": image_bytes, # 保存原始 bytes 數據
"size": len(image_bytes)
})
print(f"[DEBUG] 圖片 {img['name']} 處理成功,大小: {len(image_bytes)} bytes")
except Exception as e:
print(f"[DEBUG] 圖片處理錯誤: {e}")
continue
return processed_images
def add_log(self, log_entry: str):
"""添加命令日誌"""
self.command_logs.append(log_entry)
async def run_command(self, command: str):
"""執行命令並透過 WebSocket 發送輸出"""
if self.process:
# 終止現有進程
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
try:
self.process.kill()
except:
pass
self.process = None
try:
self.process = subprocess.Popen(
command,
shell=True,
cwd=self.project_directory,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# 在背景線程中讀取輸出
def read_output():
try:
for line in iter(self.process.stdout.readline, ''):
self.add_log(line.rstrip())
if self.websocket:
asyncio.run_coroutine_threadsafe(
self.websocket.send_json({
"type": "command_output",
"output": line
}),
asyncio.get_event_loop()
)
# 等待進程完成
exit_code = self.process.wait()
if self.websocket:
asyncio.run_coroutine_threadsafe(
self.websocket.send_json({
"type": "command_finished",
"exit_code": exit_code
}),
asyncio.get_event_loop()
)
except Exception as e:
print(f"命令執行錯誤: {e}")
finally:
self.process = None
thread = threading.Thread(target=read_output, daemon=True)
thread.start()
except Exception as e:
error_msg = f"命令執行失敗: {str(e)}\n"
self.add_log(error_msg)
if self.websocket:
await self.websocket.send_json({
"type": "command_output",
"output": error_msg
})
# ===== Web UI 管理器 =====
class WebUIManager:
"""Web UI 管理器"""
def __init__(self, host: str = "127.0.0.1", port: int = 8765):
self.host = host
self.port = port
self.app = FastAPI(title="Interactive Feedback MCP")
self.app = FastAPI(title="Interactive Feedback MCP Web UI")
self.sessions: Dict[str, WebFeedbackSession] = {}
self.server_process = None
self.server_thread: Optional[threading.Thread] = None
self.setup_routes()
# Setup static files and templates
script_dir = Path(__file__).parent
static_dir = script_dir / "static"
templates_dir = script_dir / "templates"
static_dir.mkdir(exist_ok=True)
templates_dir.mkdir(exist_ok=True)
self.app.mount("/static", StaticFiles(directory=static_dir), name="static")
self.templates = Jinja2Templates(directory=templates_dir)
def setup_routes(self):
"""設置路由"""
# 確保靜態文件目錄存在
static_dir = Path("static")
templates_dir = Path("templates")
# 靜態文件
if static_dir.exists():
self.app.mount("/static", StaticFiles(directory="static"), name="static")
# 模板
templates = Jinja2Templates(directory="templates") if templates_dir.exists() else None
@self.app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return self.templates.TemplateResponse("index.html", {"request": request})
"""首頁"""
if templates:
return templates.TemplateResponse("index.html", {"request": request})
else:
return HTMLResponse(self._get_simple_index_html())
@self.app.get("/session/{session_id}", response_class=HTMLResponse)
async def session_page(request: Request, session_id: str):
@self.app.get("/session/{session_id}", response_class=HTMLResponse)
async def feedback_session(request: Request, session_id: str):
"""回饋會話頁面"""
session = self.sessions.get(session_id)
if not session:
return HTMLResponse("Session not found", status_code=404)
return HTMLResponse("會話不存在", status_code=404)
return self.templates.TemplateResponse("feedback.html", {
"request": request,
"session_id": session_id,
"project_directory": session.project_directory,
"summary": session.summary
})
if templates:
return templates.TemplateResponse("feedback.html", {
"request": request,
"session_id": session_id,
"project_directory": session.project_directory,
"summary": session.summary
})
else:
return HTMLResponse(self._get_simple_feedback_html(session_id, session))
@self.app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
await websocket.accept()
"""WebSocket 連接處理"""
session = self.sessions.get(session_id)
if not session:
await websocket.close(code=4000, reason="Session not found")
await websocket.close(code=4004, reason="會話不存在")
return
await websocket.accept()
session.websocket = websocket
# Send initial data
await websocket.send_json({
"type": "init",
"project_directory": session.project_directory,
"summary": session.summary,
"config": session.config,
"logs": session.command_logs
})
try:
while True:
data = await websocket.receive_json()
await self.handle_websocket_message(session, data)
except WebSocketDisconnect:
print(f"WebSocket 斷開連接: {session_id}")
except Exception as e:
print(f"WebSocket 錯誤: {e}")
finally:
session.websocket = None
@self.app.post("/api/complete/{session_id}")
async def complete_session(session_id: str, feedback_data: dict):
session = self.sessions.get(session_id)
if not session:
return {"error": "Session not found"}
session.feedback_result = feedback_data.get("feedback", "")
session.completed = True
return {"success": True}
async def handle_websocket_message(self, session: WebFeedbackSession, data: dict):
"""處理 WebSocket 消息"""
message_type = data.get("type")
if message_type == "run_command":
command = data.get("command", "")
await self.run_command(session, command)
elif message_type == "stop_command":
await self.stop_command(session)
command = data.get("command", "").strip()
if command:
await session.run_command(command)
elif message_type == "submit_feedback":
feedback = data.get("feedback", "")
session.feedback_result = feedback
session.completed = True
images = data.get("images", [])
await session.submit_feedback(feedback, images)
await session.websocket.send_json({
"type": "feedback_submitted",
"message": "Feedback submitted successfully"
})
elif message_type == "update_config":
session.config.update(data.get("config", {}))
elif message_type == "clear_logs":
session.command_logs.clear()
await session.websocket.send_json({
"type": "logs_cleared"
})
async def run_command(self, session: WebFeedbackSession, command: str):
if session.process:
await self.stop_command(session)
if not command.strip():
await session.websocket.send_json({
"type": "log",
"data": "Please enter a command to run\n"
})
return
session.command_logs.append(f"$ {command}\n")
await session.websocket.send_json({
"type": "log",
"data": f"$ {command}\n"
})
try:
session.process = subprocess.Popen(
command,
shell=True,
cwd=session.project_directory,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
encoding="utf-8",
errors="ignore"
)
# Start threads to read output
threading.Thread(
target=self.read_process_output,
args=(session, session.process.stdout),
daemon=True
).start()
threading.Thread(
target=self.read_process_output,
args=(session, session.process.stderr),
daemon=True
).start()
# Monitor process completion
threading.Thread(
target=self.monitor_process,
args=(session,),
daemon=True
).start()
except Exception as e:
error_msg = f"Error running command: {str(e)}\n"
session.command_logs.append(error_msg)
await session.websocket.send_json({
"type": "log",
"data": error_msg
})
def read_process_output(self, session: WebFeedbackSession, pipe):
try:
for line in iter(pipe.readline, ""):
if not line:
break
session.command_logs.append(line)
if session.websocket:
# Use threading to send async message
threading.Thread(
target=self._send_websocket_message,
args=(session.websocket, {
"type": "log",
"data": line
}),
daemon=True
).start()
except Exception:
pass
def monitor_process(self, session: WebFeedbackSession):
if session.process:
exit_code = session.process.wait()
completion_msg = f"\nProcess exited with code {exit_code}\n"
session.command_logs.append(completion_msg)
if session.websocket:
threading.Thread(
target=self._send_websocket_message,
args=(session.websocket, {
"type": "log",
"data": completion_msg
}),
daemon=True
).start()
threading.Thread(
target=self._send_websocket_message,
args=(session.websocket, {
"type": "process_completed",
"exit_code": exit_code
}),
daemon=True
).start()
session.process = None
def _send_websocket_message(self, websocket: WebSocket, message: dict):
"""Helper to send websocket message from thread"""
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(websocket.send_json(message))
loop.close()
except Exception:
pass
async def stop_command(self, session: WebFeedbackSession):
if session.process:
try:
# Kill process tree
parent = psutil.Process(session.process.pid)
for child in parent.children(recursive=True):
try:
child.kill()
except psutil.Error:
pass
parent.kill()
session.process = None
await session.websocket.send_json({
"type": "log",
"data": "\nProcess stopped\n"
})
except Exception as e:
await session.websocket.send_json({
"type": "log",
"data": f"\nError stopping process: {str(e)}\n"
})
elif message_type == "stop_command":
if session.process:
try:
session.process.terminate()
except:
pass
def create_session(self, project_directory: str, summary: str) -> str:
"""創建新的回饋會話"""
session_id = str(uuid.uuid4())
session = WebFeedbackSession(session_id, project_directory, summary)
self.sessions[session_id] = session
return session_id
def get_session(self, session_id: str) -> Optional[WebFeedbackSession]:
"""獲取會話"""
return self.sessions.get(session_id)
def remove_session(self, session_id: str):
"""移除會話"""
if session_id in self.sessions:
session = self.sessions[session_id]
if session.process:
try:
session.process.terminate()
except:
pass
del self.sessions[session_id]
def start_server(self):
"""Start the web server in a separate thread"""
if self.server_process is not None:
return # Server already running
"""啟動伺服器"""
def run_server():
uvicorn.run(
self.app,
@ -298,58 +351,169 @@ class WebUIManager:
log_level="error",
access_log=False
)
self.server_process = threading.Thread(target=run_server, daemon=True)
self.server_process.start()
# Wait a moment for server to start
time.sleep(1)
def open_browser(self, session_id: str):
"""Open browser to the session page"""
url = f"http://{self.host}:{self.port}/session/{session_id}"
self.server_thread = threading.Thread(target=run_server, daemon=True)
self.server_thread.start()
# 等待伺服器啟動
time.sleep(2)
def open_browser(self, url: str):
"""開啟瀏覽器"""
try:
webbrowser.open(url)
except Exception:
print(f"Please open your browser and navigate to: {url}")
except Exception as e:
print(f"無法開啟瀏覽器: {e}")
def _get_simple_index_html(self) -> str:
"""簡單的首頁 HTML"""
return """
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<title>Interactive Feedback MCP</title>
</head>
<body>
<h1>Interactive Feedback MCP Web UI</h1>
<p>服務器運行中...</p>
</body>
</html>
"""
def _get_simple_feedback_html(self, session_id: str, session: WebFeedbackSession) -> str:
"""簡單的回饋頁面 HTML"""
return f"""
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<title>回饋收集</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background: #1e1e1e; color: white; }}
.container {{ max-width: 800px; margin: 0 auto; }}
textarea {{ width: 100%; height: 200px; background: #2d2d30; color: white; border: 1px solid #464647; }}
button {{ background: #007acc; color: white; padding: 10px 20px; border: none; cursor: pointer; }}
</style>
</head>
<body>
<div class="container">
<h1>回饋收集</h1>
<div>
<h3>AI 工作摘要:</h3>
<p>{session.summary}</p>
</div>
<div>
<h3>您的回饋:</h3>
<textarea id="feedback" placeholder="請輸入您的回饋..."></textarea>
</div>
<button onclick="submitFeedback()">提交回饋</button>
</div>
<script>
const ws = new WebSocket('ws://localhost:{self.port}/ws/{session_id}');
function submitFeedback() {{
const feedback = document.getElementById('feedback').value;
ws.send(JSON.stringify({{
type: 'submit_feedback',
feedback: feedback,
images: []
}}));
alert('回饋已提交!');
}}
</script>
</body>
</html>
"""
def wait_for_feedback(self, session_id: str, timeout: int = 300) -> dict:
"""Wait for user feedback with timeout"""
session = self.sessions.get(session_id)
# ===== 全域管理器 =====
_web_ui_manager: Optional[WebUIManager] = None
def get_web_ui_manager() -> WebUIManager:
"""獲取全域 Web UI 管理器"""
global _web_ui_manager
if _web_ui_manager is None:
_web_ui_manager = WebUIManager()
_web_ui_manager.start_server()
return _web_ui_manager
async def launch_web_feedback_ui(project_directory: str, summary: str) -> dict:
"""啟動 Web 回饋 UI 並等待回饋"""
manager = get_web_ui_manager()
# 創建會話
session_id = manager.create_session(project_directory, summary)
session_url = f"http://{manager.host}:{manager.port}/session/{session_id}"
print(f"🌐 Web UI 已啟動: {session_url}")
# 開啟瀏覽器
manager.open_browser(session_url)
try:
# 等待用戶回饋
session = manager.get_session(session_id)
if not session:
return {"command_logs": "", "interactive_feedback": "Session not found"}
raise RuntimeError("會話創建失敗")
# Wait for feedback with timeout
start_time = time.time()
while not session.completed:
if time.time() - start_time > timeout:
return {"command_logs": "", "interactive_feedback": "Timeout waiting for feedback"}
time.sleep(0.1)
result = {
"command_logs": "".join(session.command_logs),
"interactive_feedback": session.feedback_result or ""
}
# Clean up session
del self.sessions[session_id]
result = await session.wait_for_feedback(timeout=600) # 10分鐘超時
return result
except TimeoutError:
print("⏰ 等待用戶回饋超時")
return {
"logs": "",
"interactive_feedback": "回饋超時",
"images": []
}
except Exception as e:
print(f"❌ Web UI 錯誤: {e}")
return {
"logs": "",
"interactive_feedback": f"錯誤: {str(e)}",
"images": []
}
finally:
# 清理會話
manager.remove_session(session_id)
# Global instance
web_ui_manager = WebUIManager()
def stop_web_ui():
"""停止 Web UI"""
global _web_ui_manager
if _web_ui_manager:
# 清理所有會話
for session_id in list(_web_ui_manager.sessions.keys()):
_web_ui_manager.remove_session(session_id)
_web_ui_manager = None
def launch_web_feedback_ui(project_directory: str, summary: str) -> dict:
"""Launch web UI and wait for feedback"""
# ===== 主程式入口 =====
if __name__ == "__main__":
import argparse
# Start server if not running
web_ui_manager.start_server()
parser = argparse.ArgumentParser(description="啟動 Interactive Feedback MCP Web UI")
parser.add_argument("--host", default="127.0.0.1", help="主機地址")
parser.add_argument("--port", type=int, default=8765, help="端口")
parser.add_argument("--project-directory", default=os.getcwd(), help="專案目錄")
parser.add_argument("--summary", default="測試 Web UI 功能", help="任務摘要")
# Create new session
session_id = web_ui_manager.create_session(project_directory, summary)
args = parser.parse_args()
# Open browser
web_ui_manager.open_browser(session_id)
async def main():
manager = WebUIManager(args.host, args.port)
manager.start_server()
session_id = manager.create_session(args.project_directory, args.summary)
session_url = f"http://{args.host}:{args.port}/session/{session_id}"
print(f"🌐 Web UI 已啟動: {session_url}")
manager.open_browser(session_url)
try:
# 保持運行
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n👋 Web UI 已停止")
# Wait for feedback
return web_ui_manager.wait_for_feedback(session_id)
asyncio.run(main())