ShopTRAINING/test/final_training_log_test.py

120 lines
4.1 KiB
Python
Raw Permalink Normal View History

2025-07-02 11:05:23 +08:00
#\!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最终的训练日志测试
通过直接调用进程管理器来验证日志输出是否正常工作
"""
import os
import sys
import time
import threading
# 设置编码环境
os.environ['PYTHONIOENCODING'] = 'utf-8'
if os.name == 'nt':
try:
os.system('chcp 65001 >nul 2>&1')
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except Exception as e:
print(f"Warning: Failed to set UTF-8 encoding: {e}")
def test_training_process_manager():
"""测试修复后的训练进程管理器"""
print("🧪 测试修复后的训练进程管理器")
print("=" * 60)
try:
# 添加server目录到路径
server_dir = os.path.join(os.getcwd(), 'server')
if server_dir not in sys.path:
sys.path.insert(0, server_dir)
print("1⃣ 导入训练进程管理器...")
from utils.training_process_manager import TrainingProcessManager
print("✅ 成功导入进程管理器")
print("\n2⃣ 创建并启动进程管理器...")
manager = TrainingProcessManager(max_workers=1) # 只用1个工作进程便于观察
manager.start()
print("✅ 进程管理器已启动")
# 等待工作进程完全启动
print("⏳ 等待工作进程启动...")
time.sleep(3)
print("\n3⃣ 提交训练任务...")
task_id = manager.submit_task(
product_id="P005",
model_type="transformer",
training_mode="product",
epochs=3
)
print(f"✅ 任务已提交: {task_id[:8]}")
print("\n4⃣ 监控任务执行...")
print("=" * 40)
print("观察控制台输出,应该看到:")
print("- [task_id] 🚀 训练进程启动")
print("- [task_id] 📋 任务参数")
print("- [task_id] 🤖 开始执行 transformer 模型训练...")
print("- [task_id] ✅ transformer 模型训练完成!")
print("- [task_id] 📊 训练指标")
print("=" * 40)
# 监控任务状态
for i in range(60): # 最多等待60秒
status = manager.get_task_status(task_id)
if status:
current_status = status.get('status', 'unknown')
progress = status.get('progress', 0)
if current_status == 'completed':
print(f"\n✅ 任务完成\! (耗时: {i+1}秒)")
metrics = status.get('metrics')
if metrics:
print(f"📊 最终指标: {metrics}")
break
elif current_status == 'failed':
print(f"\n❌ 任务失败\!")
error = status.get('error')
if error:
print(f"错误: {error}")
break
if i % 5 == 0: # 每5秒输出一次状态
print(f"⏱️ [{i+1:2d}/60] 状态: {current_status}, 进度: {progress:.1f}%")
time.sleep(1)
print("\n5⃣ 停止进程管理器...")
manager.stop()
print("✅ 进程管理器已停止")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
print("🔧 最终训练日志测试")
print("这将直接测试修复后的进程管理器\n")
success = test_training_process_manager()
print("\n" + "=" * 60)
if success:
print("🎉 测试完成\!")
print("\n📋 如果在上面看到了训练日志输出:")
print("- 说明进程管理器的日志转发机制工作正常")
print("- API服务器在训练时也应该显示类似的日志")
print("- 修复成功!")
else:
print("❌ 测试失败,需要进一步调试")
EOF < /dev/null