ShopTRAINING/server/trainers/xgboost_trainer.py

215 lines
7.5 KiB
Python
Raw Normal View History

2025-07-22 15:40:37 +08:00
"""
药店销售预测系统 - XGBoost 模型训练器 (插件式)
"""
import time
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.preprocessing import MinMaxScaler
from xgboost.callback import EarlyStopping
解决模型保存过程中的原子性问题,以确保系统在遇到任何错误时都能保持数据的一致性,避免产生“孤儿文件”或不完整的数据库记录。 主要变更和成果: 实现了原子化的模型保存逻辑: 在核心的 PharmacyPredictor 类 (ShopTRAINING/server/core/predictor.py) 中,我们引入了一个新的私有方法 _save_model_transactional。 此方法将所有与模型保存相关的操作(包括所有文件工件的磁盘写入和元数据到数据库的写入)封装成一个单一的事务性操作。 通过使用 try...except 块,我们确保了只有在数据库成功记录元数据后,整个保存操作才被视为成功。 引入了自动回滚机制: 如果在 _save_model_transactional 方法执行期间发生任何错误(例如,数据库连接失败、磁盘空间不足等),except 块会触发回滚。 回滚操作由 _rollback_files 方法执行,它会负责删除在该次失败的训练过程中已经生成的所有文件(如模型文件 .pth、损失曲线图 .png 和损失数据 .json)。 这保证了文件系统不会留下任何与数据库记录不匹配的“孤儿”文件,从而维护了系统的整洁和数据一致性。 重构了核心训练流程: 修改了 PharmacyPredictor 类中的 train_model 方法,使其在训练器成功返回结果后,不再直接返回,而是调用新的 _save_model_transactional 方法。 这使得保存逻辑从 api.py 的路由处理函数中解耦,并集中到负责业务流程编排的 predictor 核心类中,使代码结构更清晰,职责更分明。 相应地,api.py 中的后台训练任务逻辑也进行了简化,它现在只需信任 predictor 返回的结果,无需再关心具体的保存细节。
2025-07-29 16:28:13 +08:00
import json
2025-07-22 15:40:37 +08:00
# 导入核心工具
from utils.data_utils import prepare_tabular_data
2025-07-22 15:40:37 +08:00
from analysis.metrics import evaluate_model
from utils.model_manager import model_manager
from models.model_registry import register_trainer
from utils.visualization import plot_loss_curve # 导入绘图函数
2025-07-22 15:40:37 +08:00
def train_product_model_with_xgboost(
model_identifier: str,
training_df: pd.DataFrame,
feature_list: list,
training_mode: str,
epochs: int = 500, # XGBoost通常需要更多轮次
sequence_length: int = 1, # 对于非序列模型,此参数意义不大,但为兼容性保留
forecast_horizon: int = 1,
model_dir: str = 'saved_models',
product_id: str = None,
store_id: str = None,
aggregation_method: str = None,
version: str = None,
**kwargs
):
2025-07-22 15:40:37 +08:00
"""
使用 XGBoost 模型训练产品销售预测模型 (新数据管道版)
2025-07-22 15:40:37 +08:00
"""
print(f"🚀 XGBoost训练器启动: model_identifier='{model_identifier}'")
# --- 1. 数据准备和验证 ---
if training_df.empty:
raise ValueError(f"用于训练的数据为空")
2025-07-22 15:40:37 +08:00
product_name = training_df['product_name'].iloc[0] if 'product_name' in training_df.columns else model_identifier
2025-07-22 15:40:37 +08:00
# --- 2. 数据预处理和适配 ---
print(f"[XGBoost] 开始数据预处理,使用 {len(feature_list)} 个预选特征...")
2025-07-22 15:40:37 +08:00
# 使用标准化的表格数据准备函数
2025-07-29 11:28:27 +08:00
trainX, testX, trainY, testY, scaler_X, scaler_y, used_features = prepare_tabular_data(
training_df=training_df,
feature_list=feature_list,
target_column='net_sales_quantity'
)
2025-07-22 15:40:37 +08:00
# 转换为 XGBoost 核心 DMatrix 格式
2025-07-22 15:40:37 +08:00
dtrain = xgb.DMatrix(trainX, label=trainY)
dtest = xgb.DMatrix(testX, label=testY)
# --- 3. 模型训练 (使用核心 xgb.train API) ---
xgb_params = {
'learning_rate': kwargs.get('learning_rate', 0.08),
'subsample': kwargs.get('subsample', 0.75),
'colsample_bytree': kwargs.get('colsample_bytree', 1),
'max_depth': kwargs.get('max_depth', 7),
'gamma': kwargs.get('gamma', 0),
'objective': 'reg:squarederror',
'eval_metric': 'rmse', # eval_metric 在这里是原生支持的
'n_jobs': -1
}
n_estimators = kwargs.get('n_estimators', 500)
print("开始训练XGBoost模型 (使用核心xgb.train API)...")
# 版本锁定
current_version = model_manager.peek_next_version(
model_type='xgboost',
product_id=product_id,
store_id=store_id,
training_mode=training_mode,
aggregation_method=aggregation_method
)
print(f"🔒 本次训练版本锁定为: {current_version}")
2025-07-22 15:40:37 +08:00
start_time = time.time()
evals_result = {}
model = xgb.train(
params=xgb_params,
dtrain=dtrain,
num_boost_round=n_estimators,
evals=[(dtrain, 'train'), (dtest, 'test')],
early_stopping_rounds=50, # early_stopping_rounds 在这里是原生支持的
evals_result=evals_result,
verbose_eval=False
)
training_time = time.time() - start_time
print(f"XGBoost模型训练完成耗时: {training_time:.2f}")
# --- 4. 模型评估与可视化 ---
2025-07-22 15:40:37 +08:00
# 使用 model.best_iteration 获取最佳轮次的预测结果
test_pred = model.predict(dtest, iteration_range=(0, model.best_iteration))
# 反归一化时需要将1D数组变回2D
test_pred_inv = scaler_y.inverse_transform(test_pred.reshape(-1, 1))
test_true_inv = scaler_y.inverse_transform(testY.reshape(-1, 1))
2025-07-22 15:40:37 +08:00
metrics = evaluate_model(test_true_inv.flatten(), test_pred_inv.flatten())
metrics['training_time'] = training_time
metrics['best_iteration'] = model.best_iteration
2025-07-22 15:40:37 +08:00
print("\n模型评估指标:")
print(f"MSE: {metrics['mse']:.4f}, RMSE: {metrics['rmse']:.4f}, MAE: {metrics['mae']:.4f}, R²: {metrics['r2']:.4f}, MAPE: {metrics['mape']:.2f}%")
# 提取损失并绘制曲线
train_losses = evals_result['train']['rmse']
test_losses = evals_result['test']['rmse']
# --- 5. 保存工件 ---
# 准备 scope 和 identifier 以生成标准化的文件名
scope = training_mode
if scope == 'product':
identifier = product_id
elif scope == 'store':
identifier = store_id
elif scope == 'global':
identifier = aggregation_method
else:
identifier = product_name # 后备方案
# 绘制带有版本号的损失曲线图
loss_curve_path = plot_loss_curve(
train_losses=train_losses,
val_losses=test_losses,
model_type='xgboost',
scope=scope,
identifier=identifier,
version=current_version, # 使用锁定的版本
model_dir=model_dir
)
print(f"📈 带版本号的损失曲线已保存: {loss_curve_path}")
# 准备要保存的最终模型数据
2025-07-22 15:40:37 +08:00
model_data = {
'model_state_dict': model, # 直接保存模型对象
'scaler_X': scaler_X,
'scaler_y': scaler_y,
'config': {
'sequence_length': sequence_length,
'forecast_horizon': forecast_horizon,
'model_type': 'xgboost',
2025-07-29 11:28:27 +08:00
'features': used_features,
2025-07-22 15:40:37 +08:00
'xgb_params': xgb_params
},
'metrics': metrics,
'loss_history': evals_result,
'loss_curve_path': loss_curve_path # 直接包含路径
2025-07-22 15:40:37 +08:00
}
# 保存最终版本模型
2025-07-22 15:40:37 +08:00
final_model_path, final_version = model_manager.save_model(
model_data=model_data,
product_id=product_id,
model_type='xgboost',
store_id=store_id,
training_mode=training_mode,
aggregation_method=aggregation_method,
product_name=product_name,
version=current_version
2025-07-22 15:40:37 +08:00
)
print(f"✅ XGBoost最终模型已通过统一管理器保存版本: {final_version}")
# XGBoost的 `best_model` 就是它自己,因为 `xgb.train` 内部处理了早停。
# 我们创建一个指向最终模型的 "best" 版本文件,以保持接口一致性。
best_model_path, _ = model_manager.save_model(
model_data=model_data,
product_id=product_id,
model_type='xgboost',
store_id=store_id,
training_mode=training_mode,
aggregation_method=aggregation_method,
product_name=product_name,
version=f"{current_version}_best"
)
print(f"✅ XGBoost最佳模型引用已创建版本: {current_version}_best")
解决模型保存过程中的原子性问题,以确保系统在遇到任何错误时都能保持数据的一致性,避免产生“孤儿文件”或不完整的数据库记录。 主要变更和成果: 实现了原子化的模型保存逻辑: 在核心的 PharmacyPredictor 类 (ShopTRAINING/server/core/predictor.py) 中,我们引入了一个新的私有方法 _save_model_transactional。 此方法将所有与模型保存相关的操作(包括所有文件工件的磁盘写入和元数据到数据库的写入)封装成一个单一的事务性操作。 通过使用 try...except 块,我们确保了只有在数据库成功记录元数据后,整个保存操作才被视为成功。 引入了自动回滚机制: 如果在 _save_model_transactional 方法执行期间发生任何错误(例如,数据库连接失败、磁盘空间不足等),except 块会触发回滚。 回滚操作由 _rollback_files 方法执行,它会负责删除在该次失败的训练过程中已经生成的所有文件(如模型文件 .pth、损失曲线图 .png 和损失数据 .json)。 这保证了文件系统不会留下任何与数据库记录不匹配的“孤儿”文件,从而维护了系统的整洁和数据一致性。 重构了核心训练流程: 修改了 PharmacyPredictor 类中的 train_model 方法,使其在训练器成功返回结果后,不再直接返回,而是调用新的 _save_model_transactional 方法。 这使得保存逻辑从 api.py 的路由处理函数中解耦,并集中到负责业务流程编排的 predictor 核心类中,使代码结构更清晰,职责更分明。 相应地,api.py 中的后台训练任务逻辑也进行了简化,它现在只需信任 predictor 返回的结果,无需再关心具体的保存细节。
2025-07-29 16:28:13 +08:00
# --- 新增保存损失历史为JSON文件 ---
loss_data_filename = f"{identifier}_{current_version}_loss_curve_data.json"
loss_data_path = os.path.join(model_dir, loss_data_filename)
loss_data_to_save = {
'epochs': list(range(1, len(train_losses) + 1)),
'train_loss': train_losses,
'test_loss': test_losses
}
with open(loss_data_path, 'w') as f:
json.dump(loss_data_to_save, f)
print(f"💾 损失历史数据已保存: {loss_data_path}")
# 组装返回的工件
artifacts = {
"versioned_model": final_model_path,
"loss_curve_plot": loss_curve_path,
解决模型保存过程中的原子性问题,以确保系统在遇到任何错误时都能保持数据的一致性,避免产生“孤儿文件”或不完整的数据库记录。 主要变更和成果: 实现了原子化的模型保存逻辑: 在核心的 PharmacyPredictor 类 (ShopTRAINING/server/core/predictor.py) 中,我们引入了一个新的私有方法 _save_model_transactional。 此方法将所有与模型保存相关的操作(包括所有文件工件的磁盘写入和元数据到数据库的写入)封装成一个单一的事务性操作。 通过使用 try...except 块,我们确保了只有在数据库成功记录元数据后,整个保存操作才被视为成功。 引入了自动回滚机制: 如果在 _save_model_transactional 方法执行期间发生任何错误(例如,数据库连接失败、磁盘空间不足等),except 块会触发回滚。 回滚操作由 _rollback_files 方法执行,它会负责删除在该次失败的训练过程中已经生成的所有文件(如模型文件 .pth、损失曲线图 .png 和损失数据 .json)。 这保证了文件系统不会留下任何与数据库记录不匹配的“孤儿”文件,从而维护了系统的整洁和数据一致性。 重构了核心训练流程: 修改了 PharmacyPredictor 类中的 train_model 方法,使其在训练器成功返回结果后,不再直接返回,而是调用新的 _save_model_transactional 方法。 这使得保存逻辑从 api.py 的路由处理函数中解耦,并集中到负责业务流程编排的 predictor 核心类中,使代码结构更清晰,职责更分明。 相应地,api.py 中的后台训练任务逻辑也进行了简化,它现在只需信任 predictor 返回的结果,无需再关心具体的保存细节。
2025-07-29 16:28:13 +08:00
"loss_curve_data": loss_data_path,
"best_model": best_model_path,
解决模型保存过程中的原子性问题,以确保系统在遇到任何错误时都能保持数据的一致性,避免产生“孤儿文件”或不完整的数据库记录。 主要变更和成果: 实现了原子化的模型保存逻辑: 在核心的 PharmacyPredictor 类 (ShopTRAINING/server/core/predictor.py) 中,我们引入了一个新的私有方法 _save_model_transactional。 此方法将所有与模型保存相关的操作(包括所有文件工件的磁盘写入和元数据到数据库的写入)封装成一个单一的事务性操作。 通过使用 try...except 块,我们确保了只有在数据库成功记录元数据后,整个保存操作才被视为成功。 引入了自动回滚机制: 如果在 _save_model_transactional 方法执行期间发生任何错误(例如,数据库连接失败、磁盘空间不足等),except 块会触发回滚。 回滚操作由 _rollback_files 方法执行,它会负责删除在该次失败的训练过程中已经生成的所有文件(如模型文件 .pth、损失曲线图 .png 和损失数据 .json)。 这保证了文件系统不会留下任何与数据库记录不匹配的“孤儿”文件,从而维护了系统的整洁和数据一致性。 重构了核心训练流程: 修改了 PharmacyPredictor 类中的 train_model 方法,使其在训练器成功返回结果后,不再直接返回,而是调用新的 _save_model_transactional 方法。 这使得保存逻辑从 api.py 的路由处理函数中解耦,并集中到负责业务流程编排的 predictor 核心类中,使代码结构更清晰,职责更分明。 相应地,api.py 中的后台训练任务逻辑也进行了简化,它现在只需信任 predictor 返回的结果,无需再关心具体的保存细节。
2025-07-29 16:28:13 +08:00
"version": final_version,
'loss_history': evals_result
}
return metrics, artifacts
2025-07-22 15:40:37 +08:00
# --- 将此训练器注册到系统中 ---
register_trainer('xgboost', train_product_model_with_xgboost)