""" 药店销售预测系统 - XGBoost 模型训练器 (插件式) """ import time import pandas as pd import numpy as np import xgboost as xgb from sklearn.preprocessing import MinMaxScaler from xgboost.callback import EarlyStopping # 导入核心工具 from utils.data_utils import prepare_tabular_data from analysis.metrics import evaluate_model from utils.model_manager import model_manager from models.model_registry import register_trainer from utils.visualization import plot_loss_curve # 导入绘图函数 def train_product_model_with_xgboost( model_identifier: str, training_df: pd.DataFrame, feature_list: list, training_mode: str, epochs: int = 500, # XGBoost通常需要更多轮次 sequence_length: int = 1, # 对于非序列模型,此参数意义不大,但为兼容性保留 forecast_horizon: int = 1, model_dir: str = 'saved_models', product_id: str = None, store_id: str = None, aggregation_method: str = None, version: str = None, **kwargs ): """ 使用 XGBoost 模型训练产品销售预测模型 (新数据管道版)。 """ print(f"🚀 XGBoost训练器启动: model_identifier='{model_identifier}'") # --- 1. 数据准备和验证 --- if training_df.empty: raise ValueError(f"用于训练的数据为空") product_name = training_df['product_name'].iloc[0] if 'product_name' in training_df.columns else model_identifier # --- 2. 数据预处理和适配 --- print(f"[XGBoost] 开始数据预处理,使用 {len(feature_list)} 个预选特征...") # 使用标准化的表格数据准备函数 trainX, testX, trainY, testY, scaler_X, scaler_y = prepare_tabular_data( training_df=training_df, feature_list=feature_list, target_column='net_sales_quantity' ) # 转换为 XGBoost 核心 DMatrix 格式 dtrain = xgb.DMatrix(trainX, label=trainY) dtest = xgb.DMatrix(testX, label=testY) # --- 3. 模型训练 (使用核心 xgb.train API) --- xgb_params = { 'learning_rate': kwargs.get('learning_rate', 0.08), 'subsample': kwargs.get('subsample', 0.75), 'colsample_bytree': kwargs.get('colsample_bytree', 1), 'max_depth': kwargs.get('max_depth', 7), 'gamma': kwargs.get('gamma', 0), 'objective': 'reg:squarederror', 'eval_metric': 'rmse', # eval_metric 在这里是原生支持的 'n_jobs': -1 } n_estimators = kwargs.get('n_estimators', 500) print("开始训练XGBoost模型 (使用核心xgb.train API)...") # 版本锁定 current_version = model_manager.peek_next_version( model_type='xgboost', product_id=product_id, store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method ) print(f"🔒 本次训练版本锁定为: {current_version}") start_time = time.time() evals_result = {} model = xgb.train( params=xgb_params, dtrain=dtrain, num_boost_round=n_estimators, evals=[(dtrain, 'train'), (dtest, 'test')], early_stopping_rounds=50, # early_stopping_rounds 在这里是原生支持的 evals_result=evals_result, verbose_eval=False ) training_time = time.time() - start_time print(f"XGBoost模型训练完成,耗时: {training_time:.2f}秒") # --- 4. 模型评估与可视化 --- # 使用 model.best_iteration 获取最佳轮次的预测结果 test_pred = model.predict(dtest, iteration_range=(0, model.best_iteration)) # 反归一化时需要将1D数组变回2D test_pred_inv = scaler_y.inverse_transform(test_pred.reshape(-1, 1)) test_true_inv = scaler_y.inverse_transform(testY.reshape(-1, 1)) metrics = evaluate_model(test_true_inv.flatten(), test_pred_inv.flatten()) metrics['training_time'] = training_time metrics['best_iteration'] = model.best_iteration print("\n模型评估指标:") print(f"MSE: {metrics['mse']:.4f}, RMSE: {metrics['rmse']:.4f}, MAE: {metrics['mae']:.4f}, R²: {metrics['r2']:.4f}, MAPE: {metrics['mape']:.2f}%") # 提取损失并绘制曲线 train_losses = evals_result['train']['rmse'] test_losses = evals_result['test']['rmse'] # --- 5. 保存工件 --- # 准备 scope 和 identifier 以生成标准化的文件名 scope = training_mode if scope == 'product': identifier = product_id elif scope == 'store': identifier = store_id elif scope == 'global': identifier = aggregation_method else: identifier = product_name # 后备方案 # 绘制带有版本号的损失曲线图 loss_curve_path = plot_loss_curve( train_losses=train_losses, val_losses=test_losses, model_type='xgboost', scope=scope, identifier=identifier, version=current_version, # 使用锁定的版本 model_dir=model_dir ) print(f"📈 带版本号的损失曲线已保存: {loss_curve_path}") # 准备要保存的最终模型数据 model_data = { 'model_state_dict': model, # 直接保存模型对象 'scaler_X': scaler_X, 'scaler_y': scaler_y, 'config': { 'sequence_length': sequence_length, 'forecast_horizon': forecast_horizon, 'model_type': 'xgboost', 'features': feature_list, 'xgb_params': xgb_params }, 'metrics': metrics, 'loss_history': evals_result, 'loss_curve_path': loss_curve_path # 直接包含路径 } # 保存最终版本模型 final_model_path, final_version = model_manager.save_model( model_data=model_data, product_id=product_id, model_type='xgboost', store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method, product_name=product_name, version=current_version ) print(f"✅ XGBoost最终模型已通过统一管理器保存,版本: {final_version}") # XGBoost的 `best_model` 就是它自己,因为 `xgb.train` 内部处理了早停。 # 我们创建一个指向最终模型的 "best" 版本文件,以保持接口一致性。 best_model_path, _ = model_manager.save_model( model_data=model_data, product_id=product_id, model_type='xgboost', store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method, product_name=product_name, version=f"{current_version}_best" ) print(f"✅ XGBoost最佳模型引用已创建,版本: {current_version}_best") # 组装返回的工件 artifacts = { "versioned_model": final_model_path, "loss_curve_plot": loss_curve_path, "best_model": best_model_path, "version": final_version } return metrics, artifacts # --- 将此训练器注册到系统中 --- register_trainer('xgboost', train_product_model_with_xgboost)