120 lines
4.1 KiB
Python
120 lines
4.1 KiB
Python
#\!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
最终的训练日志测试
|
||
通过直接调用进程管理器来验证日志输出是否正常工作
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import threading
|
||
|
||
# 设置编码环境
|
||
os.environ['PYTHONIOENCODING'] = 'utf-8'
|
||
if os.name == 'nt':
|
||
try:
|
||
os.system('chcp 65001 >nul 2>&1')
|
||
if hasattr(sys.stdout, 'reconfigure'):
|
||
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
||
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
|
||
except Exception as e:
|
||
print(f"Warning: Failed to set UTF-8 encoding: {e}")
|
||
|
||
def test_training_process_manager():
|
||
"""测试修复后的训练进程管理器"""
|
||
print("🧪 测试修复后的训练进程管理器")
|
||
print("=" * 60)
|
||
|
||
try:
|
||
# 添加server目录到路径
|
||
server_dir = os.path.join(os.getcwd(), 'server')
|
||
if server_dir not in sys.path:
|
||
sys.path.insert(0, server_dir)
|
||
|
||
print("1️⃣ 导入训练进程管理器...")
|
||
from utils.training_process_manager import TrainingProcessManager
|
||
print("✅ 成功导入进程管理器")
|
||
|
||
print("\n2️⃣ 创建并启动进程管理器...")
|
||
manager = TrainingProcessManager(max_workers=1) # 只用1个工作进程便于观察
|
||
manager.start()
|
||
print("✅ 进程管理器已启动")
|
||
|
||
# 等待工作进程完全启动
|
||
print("⏳ 等待工作进程启动...")
|
||
time.sleep(3)
|
||
|
||
print("\n3️⃣ 提交训练任务...")
|
||
task_id = manager.submit_task(
|
||
product_id="P005",
|
||
model_type="transformer",
|
||
training_mode="product",
|
||
epochs=3
|
||
)
|
||
print(f"✅ 任务已提交: {task_id[:8]}")
|
||
|
||
print("\n4️⃣ 监控任务执行...")
|
||
print("=" * 40)
|
||
print("观察控制台输出,应该看到:")
|
||
print("- [task_id] 🚀 训练进程启动")
|
||
print("- [task_id] 📋 任务参数")
|
||
print("- [task_id] 🤖 开始执行 transformer 模型训练...")
|
||
print("- [task_id] ✅ transformer 模型训练完成!")
|
||
print("- [task_id] 📊 训练指标")
|
||
print("=" * 40)
|
||
|
||
# 监控任务状态
|
||
for i in range(60): # 最多等待60秒
|
||
status = manager.get_task_status(task_id)
|
||
if status:
|
||
current_status = status.get('status', 'unknown')
|
||
progress = status.get('progress', 0)
|
||
|
||
if current_status == 'completed':
|
||
print(f"\n✅ 任务完成\! (耗时: {i+1}秒)")
|
||
metrics = status.get('metrics')
|
||
if metrics:
|
||
print(f"📊 最终指标: {metrics}")
|
||
break
|
||
elif current_status == 'failed':
|
||
print(f"\n❌ 任务失败\!")
|
||
error = status.get('error')
|
||
if error:
|
||
print(f"错误: {error}")
|
||
break
|
||
|
||
if i % 5 == 0: # 每5秒输出一次状态
|
||
print(f"⏱️ [{i+1:2d}/60] 状态: {current_status}, 进度: {progress:.1f}%")
|
||
|
||
time.sleep(1)
|
||
|
||
print("\n5️⃣ 停止进程管理器...")
|
||
manager.stop()
|
||
print("✅ 进程管理器已停止")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
if __name__ == "__main__":
|
||
print("🔧 最终训练日志测试")
|
||
print("这将直接测试修复后的进程管理器\n")
|
||
|
||
success = test_training_process_manager()
|
||
|
||
print("\n" + "=" * 60)
|
||
if success:
|
||
print("🎉 测试完成\!")
|
||
print("\n📋 如果在上面看到了训练日志输出:")
|
||
print("- 说明进程管理器的日志转发机制工作正常")
|
||
print("- API服务器在训练时也应该显示类似的日志")
|
||
print("- 修复成功!")
|
||
else:
|
||
print("❌ 测试失败,需要进一步调试")
|
||
EOF < /dev/null
|