""" 药店销售预测系统 - XGBoost 模型训练器 (插件式) """ import time import os import pandas as pd import numpy as np import xgboost as xgb from sklearn.preprocessing import MinMaxScaler from xgboost.callback import EarlyStopping import json # 导入核心工具 from utils.data_utils import prepare_tabular_data from analysis.metrics import evaluate_model from utils.model_manager import model_manager from models.model_registry import register_trainer from utils.visualization import plot_loss_curve # 导入绘图函数 def train_product_model_with_xgboost( model_identifier: str, training_df: pd.DataFrame, feature_list: list, training_mode: str, epochs: int = 500, # XGBoost通常需要更多轮次 sequence_length: int = 1, # 对于非序列模型,此参数意义不大,但为兼容性保留 forecast_horizon: int = 1, model_dir: str = 'saved_models', product_id: str = None, store_id: str = None, aggregation_method: str = None, version: str = None, **kwargs ): """ 使用 XGBoost 模型训练产品销售预测模型 (新数据管道版)。 """ print(f"🚀 XGBoost训练器启动: model_identifier='{model_identifier}'") created_files = [] success = False try: # --- 1. 数据准备和验证 --- if training_df.empty: raise ValueError("用于训练的数据为空") product_name = training_df['product_name'].iloc[0] if 'product_name' in training_df.columns else model_identifier # --- 2. 数据预处理和适配 --- print(f"[XGBoost] 开始数据预处理,使用 {len(feature_list)} 个预选特征...") trainX, testX, trainY, testY, scaler_X, scaler_y, used_features = prepare_tabular_data( training_df=training_df, feature_list=feature_list, target_column='net_sales_quantity' ) dtrain = xgb.DMatrix(trainX, label=trainY) dtest = xgb.DMatrix(testX, label=testY) # --- 3. 模型训练 --- xgb_params = { 'learning_rate': kwargs.get('learning_rate', 0.08), 'subsample': kwargs.get('subsample', 0.75), 'colsample_bytree': kwargs.get('colsample_bytree', 1), 'max_depth': kwargs.get('max_depth', 7), 'gamma': kwargs.get('gamma', 0), 'objective': 'reg:squarederror', 'eval_metric': 'rmse', 'n_jobs': -1 } # 核心修复:使用前端传入的epochs作为训练轮次 (num_boost_round) n_estimators = epochs print(f"开始训练XGBoost模型 (使用核心xgb.train API),共 {n_estimators} 轮...") current_version = model_manager.peek_next_version( model_type='xgboost', product_id=product_id, store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method ) print(f"🔒 本次训练版本锁定为: {current_version}") start_time = time.time() evals_result = {} model = xgb.train( params=xgb_params, dtrain=dtrain, num_boost_round=n_estimators, evals=[(dtrain, 'train'), (dtest, 'test')], early_stopping_rounds=50, evals_result=evals_result, verbose_eval=False ) training_time = time.time() - start_time print(f"XGBoost模型训练完成,耗时: {training_time:.2f}秒") # --- 4. 模型评估 --- test_pred = model.predict(dtest, iteration_range=(0, model.best_iteration)) # 核心修复:确保真实值和预测值都进行反归一化 test_pred_inv = scaler_y.inverse_transform(test_pred.reshape(-1, 1)) test_true_inv = scaler_y.inverse_transform(testY.reshape(-1, 1)) metrics = evaluate_model(test_true_inv.flatten(), test_pred_inv.flatten()) metrics.update({'training_time': training_time, 'best_iteration': model.best_iteration}) print(f"\n模型评估指标 (真实值): MSE={metrics['mse']:.4f}, RMSE={metrics['rmse']:.4f}, MAE={metrics['mae']:.4f}, R²={metrics['r2']:.4f}, MAPE={metrics['mape']:.2f}%") # --- 5. 保存工件 --- scope = training_mode identifier = product_id if scope == 'product' else store_id if scope == 'store' else aggregation_method if scope == 'global' else product_name # 核心修复:安全地提取损失历史数据 train_losses = evals_result.get('train', {}).get('rmse', []) test_losses = evals_result.get('test', {}).get('rmse', []) # 准备X轴数据 (boosting rounds) rounds = list(range(1, len(train_losses) + 1)) if train_losses else [] # 绘制损失曲线 loss_curve_path = plot_loss_curve( train_losses=train_losses, val_losses=test_losses, model_type='xgboost', scope=scope, identifier=identifier, version=current_version, model_dir=model_dir, x_axis_data=rounds # 传递X轴数据 ) created_files.append(loss_curve_path) print(f"📈 损失曲线已保存: {loss_curve_path}") # 保存模型文件 base_model_filename = model_manager.generate_model_filename( model_type='xgboost', version=current_version, training_mode=training_mode, product_id=product_id, store_id=store_id, aggregation_method=aggregation_method ).replace('.pth', '') # 移除默认扩展名 final_model_filename = f"{base_model_filename}.json" # 明确指定JSON格式 final_model_path = os.path.join(model_dir, final_model_filename) model.save_model(final_model_path) created_files.append(final_model_path) print(f"✅ 最终模型文件已创建 (JSON格式): {final_model_path}") # 保存最佳模型引用 best_model_filename = model_manager.generate_model_filename( model_type='xgboost', version=f"{current_version}_best", training_mode=training_mode, product_id=product_id, store_id=store_id, aggregation_method=aggregation_method ).replace('.pth', '') # 移除默认扩展名 best_model_filename_json = f"{best_model_filename}.json" # 明确指定JSON格式 best_model_path = os.path.join(model_dir, best_model_filename_json) model.save_model(best_model_path) created_files.append(best_model_path) print(f"✅ 最佳模型引用文件已创建 (JSON格式): {best_model_path}") # 保存损失历史 base_filename = os.path.splitext(final_model_filename)[0] loss_data_filename = f"{base_filename}_loss_curve_data.json" loss_data_path = os.path.join(model_dir, loss_data_filename) with open(loss_data_path, 'w') as f: json.dump({ 'epochs': rounds, # 使用正确的boosting rounds作为epochs 'train_loss': train_losses, 'test_loss': test_losses }, f) created_files.append(loss_data_path) print(f"💾 损失历史数据已保存: {loss_data_path}") artifacts = { "versioned_model": final_model_path, "loss_curve_plot": loss_curve_path, "loss_curve_data": loss_data_path, "best_model": best_model_path, "version": current_version } success = True return metrics, artifacts finally: if not success: print("❌ 训练失败,正在回滚并删除已创建的文件...") for file_path in created_files: try: if os.path.exists(file_path): os.remove(file_path) print(f" - 已删除: {file_path}") except OSError as e: print(f" - 警告: 删除文件 '{file_path}' 失败: {e}") # --- 将此训练器注册到系统中 --- register_trainer('xgboost', train_product_model_with_xgboost)