#\!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 最终的训练日志测试 通过直接调用进程管理器来验证日志输出是否正常工作 """ import os import sys import time import threading # 设置编码环境 os.environ['PYTHONIOENCODING'] = 'utf-8' if os.name == 'nt': try: os.system('chcp 65001 >nul 2>&1') if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') sys.stderr.reconfigure(encoding='utf-8', errors='replace') except Exception as e: print(f"Warning: Failed to set UTF-8 encoding: {e}") def test_training_process_manager(): """测试修复后的训练进程管理器""" print("🧪 测试修复后的训练进程管理器") print("=" * 60) try: # 添加server目录到路径 server_dir = os.path.join(os.getcwd(), 'server') if server_dir not in sys.path: sys.path.insert(0, server_dir) print("1️⃣ 导入训练进程管理器...") from utils.training_process_manager import TrainingProcessManager print("✅ 成功导入进程管理器") print("\n2️⃣ 创建并启动进程管理器...") manager = TrainingProcessManager(max_workers=1) # 只用1个工作进程便于观察 manager.start() print("✅ 进程管理器已启动") # 等待工作进程完全启动 print("⏳ 等待工作进程启动...") time.sleep(3) print("\n3️⃣ 提交训练任务...") task_id = manager.submit_task( product_id="P005", model_type="transformer", training_mode="product", epochs=3 ) print(f"✅ 任务已提交: {task_id[:8]}") print("\n4️⃣ 监控任务执行...") print("=" * 40) print("观察控制台输出,应该看到:") print("- [task_id] 🚀 训练进程启动") print("- [task_id] 📋 任务参数") print("- [task_id] 🤖 开始执行 transformer 模型训练...") print("- [task_id] ✅ transformer 模型训练完成!") print("- [task_id] 📊 训练指标") print("=" * 40) # 监控任务状态 for i in range(60): # 最多等待60秒 status = manager.get_task_status(task_id) if status: current_status = status.get('status', 'unknown') progress = status.get('progress', 0) if current_status == 'completed': print(f"\n✅ 任务完成\! (耗时: {i+1}秒)") metrics = status.get('metrics') if metrics: print(f"📊 最终指标: {metrics}") break elif current_status == 'failed': print(f"\n❌ 任务失败\!") error = status.get('error') if error: print(f"错误: {error}") break if i % 5 == 0: # 每5秒输出一次状态 print(f"⏱️ [{i+1:2d}/60] 状态: {current_status}, 进度: {progress:.1f}%") time.sleep(1) print("\n5️⃣ 停止进程管理器...") manager.stop() print("✅ 进程管理器已停止") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": print("🔧 最终训练日志测试") print("这将直接测试修复后的进程管理器\n") success = test_training_process_manager() print("\n" + "=" * 60) if success: print("🎉 测试完成\!") print("\n📋 如果在上面看到了训练日志输出:") print("- 说明进程管理器的日志转发机制工作正常") print("- API服务器在训练时也应该显示类似的日志") print("- 修复成功!") else: print("❌ 测试失败,需要进一步调试") EOF < /dev/null