mcp-feedback-enhanced/tests/unit/test_port_manager.py

250 lines
9.0 KiB
Python
Raw Normal View History

"""
端口管理器測試模組
測試 PortManager 類的各項功能包括
- 端口可用性檢測
- 進程查找和清理
- 增強端口查找
"""
import socket
import sys
2025-06-11 03:25:08 +08:00
import time
from unittest.mock import patch
import pytest
# 添加 src 目錄到 Python 路徑
2025-06-11 03:25:08 +08:00
sys.path.insert(0, "src")
from mcp_feedback_enhanced.web.utils.port_manager import PortManager
class TestPortManager:
"""端口管理器測試類"""
2025-06-11 03:25:08 +08:00
def test_is_port_available_free_port(self):
"""測試檢測空閒端口"""
# 找一個肯定空閒的端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2025-06-11 03:25:08 +08:00
s.bind(("127.0.0.1", 0))
free_port = s.getsockname()[1]
2025-06-11 03:25:08 +08:00
# 測試該端口是否被檢測為可用
2025-06-11 03:25:08 +08:00
assert PortManager.is_port_available("127.0.0.1", free_port) is True
def test_is_port_available_occupied_port(self):
"""測試檢測被占用的端口"""
# 創建一個占用端口的 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2025-06-11 03:25:08 +08:00
server_socket.bind(("127.0.0.1", 0))
occupied_port = server_socket.getsockname()[1]
server_socket.listen(1)
2025-06-11 03:25:08 +08:00
try:
# 測試該端口是否被檢測為不可用
2025-06-11 03:25:08 +08:00
assert PortManager.is_port_available("127.0.0.1", occupied_port) is False
finally:
server_socket.close()
2025-06-11 03:25:08 +08:00
def test_find_free_port_enhanced_preferred_available(self):
"""測試當偏好端口可用時的行為"""
# 找一個空閒端口作為偏好端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2025-06-11 03:25:08 +08:00
s.bind(("127.0.0.1", 0))
preferred_port = s.getsockname()[1]
2025-06-11 03:25:08 +08:00
# 測試是否返回偏好端口
result_port = PortManager.find_free_port_enhanced(
2025-06-11 03:25:08 +08:00
preferred_port=preferred_port, auto_cleanup=False
)
assert result_port == preferred_port
2025-06-11 03:25:08 +08:00
def test_find_free_port_enhanced_preferred_occupied(self):
"""測試當偏好端口被占用時的行為"""
# 創建一個占用端口的 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2025-06-11 03:25:08 +08:00
server_socket.bind(("127.0.0.1", 0))
occupied_port = server_socket.getsockname()[1]
server_socket.listen(1)
2025-06-11 03:25:08 +08:00
try:
# 測試是否返回其他可用端口
result_port = PortManager.find_free_port_enhanced(
2025-06-11 03:25:08 +08:00
preferred_port=occupied_port, auto_cleanup=False
)
assert result_port != occupied_port
assert result_port > occupied_port # 應該向上查找
2025-06-11 03:25:08 +08:00
# 驗證返回的端口確實可用
2025-06-11 03:25:08 +08:00
assert PortManager.is_port_available("127.0.0.1", result_port) is True
finally:
server_socket.close()
2025-06-11 03:25:08 +08:00
def test_find_process_using_port_no_process(self):
"""測試查找沒有進程占用的端口"""
# 找一個空閒端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2025-06-11 03:25:08 +08:00
s.bind(("127.0.0.1", 0))
free_port = s.getsockname()[1]
2025-06-11 03:25:08 +08:00
# 測試是否正確返回 None
result = PortManager.find_process_using_port(free_port)
assert result is None
2025-06-11 03:25:08 +08:00
def test_find_process_using_port_with_process(self):
"""測試查找有進程占用的端口"""
# 創建一個簡單的測試服務器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2025-06-11 03:25:08 +08:00
server_socket.bind(("127.0.0.1", 0))
test_port = server_socket.getsockname()[1]
server_socket.listen(1)
2025-06-11 03:25:08 +08:00
try:
# 測試是否能找到進程信息
result = PortManager.find_process_using_port(test_port)
2025-06-11 03:25:08 +08:00
if result: # 如果找到了進程(在某些環境下可能找不到)
assert isinstance(result, dict)
2025-06-11 03:25:08 +08:00
assert "pid" in result
assert "name" in result
assert "cmdline" in result
assert isinstance(result["pid"], int)
assert result["pid"] > 0
finally:
server_socket.close()
2025-06-11 03:25:08 +08:00
def test_get_port_status_available(self):
"""測試獲取可用端口的狀態"""
# 找一個空閒端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2025-06-11 03:25:08 +08:00
s.bind(("127.0.0.1", 0))
free_port = s.getsockname()[1]
2025-06-11 03:25:08 +08:00
status = PortManager.get_port_status(free_port)
2025-06-11 03:25:08 +08:00
assert status["port"] == free_port
assert status["host"] == "127.0.0.1"
assert status["available"] is True
assert status["process"] is None
assert status["error"] is None
def test_get_port_status_occupied(self):
"""測試獲取被占用端口的狀態"""
# 創建一個占用端口的 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2025-06-11 03:25:08 +08:00
server_socket.bind(("127.0.0.1", 0))
occupied_port = server_socket.getsockname()[1]
server_socket.listen(1)
2025-06-11 03:25:08 +08:00
try:
status = PortManager.get_port_status(occupied_port)
2025-06-11 03:25:08 +08:00
assert status["port"] == occupied_port
assert status["host"] == "127.0.0.1"
assert status["available"] is False
# process 可能為 None取決於系統權限
2025-06-11 03:25:08 +08:00
assert status["error"] is None
finally:
server_socket.close()
2025-06-11 03:25:08 +08:00
def test_list_listening_ports(self):
"""測試列出監聽端口"""
# 創建幾個測試服務器
servers = []
test_ports = []
2025-06-11 03:25:08 +08:00
try:
for i in range(2):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2025-06-11 03:25:08 +08:00
server_socket.bind(("127.0.0.1", 0))
port = server_socket.getsockname()[1]
server_socket.listen(1)
2025-06-11 03:25:08 +08:00
servers.append(server_socket)
test_ports.append(port)
2025-06-11 03:25:08 +08:00
# 測試列出監聽端口
min_port = min(test_ports) - 10
max_port = max(test_ports) + 10
2025-06-11 03:25:08 +08:00
listening_ports = PortManager.list_listening_ports(min_port, max_port)
2025-06-11 03:25:08 +08:00
# 驗證結果
assert isinstance(listening_ports, list)
2025-06-11 03:25:08 +08:00
# 檢查我們的測試端口是否在列表中
2025-06-11 03:25:08 +08:00
found_ports = [p["port"] for p in listening_ports]
for test_port in test_ports:
if test_port in found_ports:
# 找到了我們的端口,驗證信息完整性
2025-06-11 03:25:08 +08:00
port_info = next(
p for p in listening_ports if p["port"] == test_port
)
assert "host" in port_info
assert "pid" in port_info
assert "process_name" in port_info
assert "cmdline" in port_info
finally:
# 清理測試服務器
for server in servers:
server.close()
2025-06-11 03:25:08 +08:00
@patch("mcp_feedback_enhanced.web.utils.port_manager.psutil.Process")
def test_should_cleanup_process_mcp_process(self, mock_process):
"""測試是否應該清理 MCP 相關進程"""
# 模擬 MCP 相關進程
process_info = {
2025-06-11 03:25:08 +08:00
"pid": 1234,
"name": "python.exe",
"cmdline": "python -m mcp-feedback-enhanced test --web",
"create_time": time.time(),
"status": "running",
}
2025-06-11 03:25:08 +08:00
result = PortManager._should_cleanup_process(process_info)
assert result is True
2025-06-11 03:25:08 +08:00
@patch("mcp_feedback_enhanced.web.utils.port_manager.psutil.Process")
def test_should_cleanup_process_other_process(self, mock_process):
"""測試是否應該清理其他進程"""
# 模擬其他進程
process_info = {
2025-06-11 03:25:08 +08:00
"pid": 5678,
"name": "chrome.exe",
"cmdline": "chrome --new-window",
"create_time": time.time(),
"status": "running",
}
2025-06-11 03:25:08 +08:00
result = PortManager._should_cleanup_process(process_info)
assert result is False
2025-06-11 03:25:08 +08:00
def test_find_free_port_enhanced_max_attempts(self):
"""測試最大嘗試次數限制"""
# 這個測試比較難實現,因為需要占用大量連續端口
# 我們只測試參數是否正確傳遞
try:
result = PortManager.find_free_port_enhanced(
preferred_port=65000, # 使用高端口減少衝突
auto_cleanup=False,
2025-06-11 03:25:08 +08:00
max_attempts=10,
)
assert isinstance(result, int)
assert 65000 <= result <= 65535
except RuntimeError:
# 如果真的找不到端口,這也是正常的
pass
2025-06-11 03:25:08 +08:00
if __name__ == "__main__":
# 運行測試
2025-06-11 03:25:08 +08:00
pytest.main([__file__, "-v"])