ShopTRAINING/test/final_training_log_test.py
2025-07-02 11:05:23 +08:00

120 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#\!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最终的训练日志测试
通过直接调用进程管理器来验证日志输出是否正常工作
"""
import os
import sys
import time
import threading
# 设置编码环境
os.environ['PYTHONIOENCODING'] = 'utf-8'
if os.name == 'nt':
try:
os.system('chcp 65001 >nul 2>&1')
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except Exception as e:
print(f"Warning: Failed to set UTF-8 encoding: {e}")
def test_training_process_manager():
"""测试修复后的训练进程管理器"""
print("🧪 测试修复后的训练进程管理器")
print("=" * 60)
try:
# 添加server目录到路径
server_dir = os.path.join(os.getcwd(), 'server')
if server_dir not in sys.path:
sys.path.insert(0, server_dir)
print("1⃣ 导入训练进程管理器...")
from utils.training_process_manager import TrainingProcessManager
print("✅ 成功导入进程管理器")
print("\n2⃣ 创建并启动进程管理器...")
manager = TrainingProcessManager(max_workers=1) # 只用1个工作进程便于观察
manager.start()
print("✅ 进程管理器已启动")
# 等待工作进程完全启动
print("⏳ 等待工作进程启动...")
time.sleep(3)
print("\n3⃣ 提交训练任务...")
task_id = manager.submit_task(
product_id="P005",
model_type="transformer",
training_mode="product",
epochs=3
)
print(f"✅ 任务已提交: {task_id[:8]}")
print("\n4⃣ 监控任务执行...")
print("=" * 40)
print("观察控制台输出,应该看到:")
print("- [task_id] 🚀 训练进程启动")
print("- [task_id] 📋 任务参数")
print("- [task_id] 🤖 开始执行 transformer 模型训练...")
print("- [task_id] ✅ transformer 模型训练完成!")
print("- [task_id] 📊 训练指标")
print("=" * 40)
# 监控任务状态
for i in range(60): # 最多等待60秒
status = manager.get_task_status(task_id)
if status:
current_status = status.get('status', 'unknown')
progress = status.get('progress', 0)
if current_status == 'completed':
print(f"\n✅ 任务完成\! (耗时: {i+1}秒)")
metrics = status.get('metrics')
if metrics:
print(f"📊 最终指标: {metrics}")
break
elif current_status == 'failed':
print(f"\n❌ 任务失败\!")
error = status.get('error')
if error:
print(f"错误: {error}")
break
if i % 5 == 0: # 每5秒输出一次状态
print(f"⏱️ [{i+1:2d}/60] 状态: {current_status}, 进度: {progress:.1f}%")
time.sleep(1)
print("\n5⃣ 停止进程管理器...")
manager.stop()
print("✅ 进程管理器已停止")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
print("🔧 最终训练日志测试")
print("这将直接测试修复后的进程管理器\n")
success = test_training_process_manager()
print("\n" + "=" * 60)
if success:
print("🎉 测试完成\!")
print("\n📋 如果在上面看到了训练日志输出:")
print("- 说明进程管理器的日志转发机制工作正常")
print("- API服务器在训练时也应该显示类似的日志")
print("- 修复成功!")
else:
print("❌ 测试失败,需要进一步调试")
EOF < /dev/null