Compare commits

...

2 Commits

12 changed files with 296 additions and 448 deletions

View File

@ -251,3 +251,54 @@
3. 将所有模型的版本管理逻辑和工程实现标准完全对齐。
4. 创建并完善了核心技术文档,固化了开发规范。
- **项目状态**: 系统现在处于一个健壮、一致且可扩展的稳定状态。
---
## 2025-07-18: 系统性重构模型版本管理机制
**开发者**: lyf
### 14:00 - 根治版本混乱与模型加载失败问题
- **问题现象**: `KAN` 及其他算法在训练后,预测时出现版本号混乱(如出现裸数字 `1``3``best` 等无效版本)、版本重复、以及因版本不匹配导致的“模型文件未找到”的 `404` 错误。
- **根本原因深度分析**:
1. **逻辑分散**: 版本生成的逻辑分散在各个训练器 (`trainer`) 中,而版本发现的逻辑在 `config.py` 中,两者标准不一,充满冲突的正则表达式和硬编码规则。
2. **命名不统一**: `KAN` 训练器使用 `model_manager` 保存,而其他训练器使用本地的 `save_checkpoint` 函数,导致了 `..._product_..._v1.pth``..._epoch_best.pth` 等多种不兼容的命名格式并存。
3. **提取错误**: `config.py` 中的 `get_model_versions` 函数因其过于宽泛和冲突的匹配规则,会从文件名中错误地提取出无效的版本号,是导致前端下拉框内容混乱的直接原因。
- **系统性重构解决方案**:
1. **确立单一权威**: 将 [`server/utils/model_manager.py`](server/utils/model_manager.py:1) 确立为系统中唯一负责版本管理、模型命名和文件IO的组件。
2. **实现自动版本控制**: 在 `ModelManager` 中增加了 `_get_next_version` 内部方法,使其能够自动扫描现有文件,并安全地生成下一个递增的、带 `v` 前缀的版本号(如 `v3`)。
3. **统一所有训练器**: 全面重构了 `kan_trainer.py`, `mlstm_trainer.py`, `tcn_trainer.py`, 和 `transformer_trainer.py`。现在,所有训练器在保存最终模型时,都调用 `model_manager.save_model` 并且**不再自行决定版本号**,完全由 `ModelManager` 自动生成。对于训练过程中的最佳模型,则统一显式保存为 `best` 版本。
4. **清理与加固**: 废弃并删除了 `config.py` 中所有旧的、有问题的版本管理函数,并重写了 `get_model_versions`,使其只使用严格的正则表达式来查找和解析符合新命名规范的模型版本。
5. **优化API**: 更新了 `api.py`,使其完全与新的 `ModelManager` 对接,并改进了预测失败时的错误信息反馈。
- **结论**: 通过这次重构系统的版本管理机制从一个分散、混乱、充满硬编码的状态升级为了一个集中的、统一的、自动化的健壮系统。所有已知相关的bug已被从根本上解决。
---
## 2025-07-18 (续): 实现“按店铺”AI闭环及连锁Bug修复
**开发者**: lyf
### 15:00 - 架构升级:实现“按店铺”训练与预测功能
- **任务目标**: 在现有“按药品”模式基础上增加并打通“按店铺”维度的完整AI闭环。
- **核心挑战**: 需要对数据处理、模型标识、训练流程和API调用进行系统性改造以支持新的训练模式。
- **解决方案 (四步重构)**:
1. **升级 `ModelManager`**: 重新设计了模型命名规则,为店铺和全局模型提供了清晰、无歧义的标识(如 `transformer_store_S001_v1.pth`),并同步更新了解析逻辑。
2. **修正核心预测器**: 修复了 `predictor.py` 中的关键逻辑缺陷,确保在店铺模式下,系统能生成并使用正确的 `model_identifier`(如 `store_S001`),并强制调用数据聚合函数。
3. **适配API层**: 调整了 `api.py` 中的训练和预测接口,使其能够兼容和正确处理新的店铺模式请求。
4. **统一所有训练器**: 对全部四个训练器文件进行了统一修改,确保它们在保存模型时,都正确地使用了新的 `model_identifier`
### 15:30 - 连锁Bug修复第一环解决店铺模型版本加载失败
- **问题现象**: “按店铺预测”页面的模型版本下拉框为空。
- **根本原因**: `api.py` 中负责获取店铺模型版本的接口 `get_store_model_versions_api` 仍在使用旧的、不兼容新命名规范的函数来查找模型。
- **修复**: 重写了该接口,使其放弃旧函数,转而使用 `ModelManager` 来进行统一、可靠的模型查找。
### 15:40 - 连锁Bug修复第二环解决店铺预测 `404` 失败
- **问题现象**: 版本列表加载正常后,点击“开始预测”返回 `404` 错误。
- **根本原因**: 后端预测接口 `predict()` 内部的执行函数 `load_model_and_predict` 存在一段过时的、手动的模型文件查找逻辑,它完全绕过了 `ModelManager`,并错误地构建了文件路径。
- **修复 (联合重构)**:
1. **改造 `model_predictor.py`**: 彻底移除了 `load_model_and_predict` 函数内部所有过时的文件查找代码,并修改其函数签名,使其直接接收一个明确的 `model_path` 参数。
2. **改造 `api.py`**: 修改了 `predict` 接口将在API层通过 `ModelManager` 找到的正确模型路径,一路传递到最底层的 `load_model_and_predict` 函数中,确保了调用链的逻辑一致性。
### 15:50 - 连锁Bug修复第三环解决服务启动 `NameError`
- **问题现象**: 在修复预测逻辑后API服务无法启动报错 `NameError: name 'Optional' is not defined`
- **根本原因**: 在修改 `model_predictor.py` 时,使用了 `Optional` 类型提示,但忘记从 `typing` 模块导入。
- **修复**: 在 `server/predictors/model_predictor.py` 文件顶部添加了 `from typing import Optional`
- **最终结论**: 至此所有与“按店铺”功能相关的架构升级和连锁bug均已修复。系统现在能够稳定、正确地处理两种维度的训练和预测任务并且代码逻辑更加统一和健壮。

Binary file not shown.

View File

@ -107,144 +107,10 @@ except AttributeError:
# 数据库连接函数已从 init_multi_store_db 导入
# 新增:店铺训练函数
def train_store_model(store_id, model_type, epochs=50, product_scope='all', product_ids=None):
"""
为特定店铺训练模型
参数:
store_id: 店铺ID
model_type: 模型类型
epochs: 训练轮次
product_scope: 'all' 'specific'
product_ids: 当product_scope为'specific'时的药品列表
"""
try:
print(f"开始店铺训练: store_id={store_id}, model_type={model_type}")
# 获取店铺数据
if product_scope == 'specific' and product_ids:
# 训练指定药品
all_metrics = []
for product_id in product_ids:
print(f"训练店铺 {store_id} 的药品 {product_id}")
# 调用现有的训练函数,但针对特定店铺
# 注意这里需要使用PharmacyPredictor来处理店铺数据
predictor = PharmacyPredictor()
metrics = predictor.train_model(
product_id=product_id,
model_type=model_type,
store_id=store_id,
training_mode='store',
epochs=epochs
)
all_metrics.append(metrics)
# 计算平均指标
if all_metrics:
avg_metrics = {}
for key in all_metrics[0].keys():
if isinstance(all_metrics[0][key], (int, float)):
avg_metrics[key] = sum(m[key] for m in all_metrics) / len(all_metrics)
else:
avg_metrics[key] = all_metrics[0][key] # 非数值字段取第一个
return avg_metrics
else:
return {'error': '没有可训练的药品'}
else:
# 训练所有药品 - 这里可以实现聚合逻辑
# 为简化,暂时使用第一个找到的药品进行训练
from utils.multi_store_data_utils import get_store_product_sales_data
import pandas as pd
# 读取店铺所有数据,找到第一个有数据的药品
try:
from utils.multi_store_data_utils import load_multi_store_data
df = load_multi_store_data()
store_products = df[df['store_id'] == store_id]['product_id'].unique()
if len(store_products) == 0:
return {'error': f'店铺 {store_id} 没有销售数据'}
# 使用第一个药品进行训练(后续可以改进为聚合训练)
first_product = store_products[0]
print(f"使用店铺 {store_id} 的药品 {first_product} 进行训练")
# 使用PharmacyPredictor进行店铺训练
predictor = PharmacyPredictor()
return predictor.train_model(
product_id=first_product,
model_type=model_type,
store_id=store_id,
training_mode='store',
epochs=epochs
)
except Exception as e:
return {'error': f'获取店铺数据失败: {str(e)}'}
except Exception as e:
print(f"店铺训练失败: {str(e)}")
return {'error': str(e)}
# 新增:全局训练函数
def train_global_model(model_type, epochs=50, training_scope='all_stores_all_products',
aggregation_method='sum', store_ids=None, product_ids=None):
"""
训练全局模型
参数:
model_type: 模型类型
epochs: 训练轮次
training_scope: 训练范围
aggregation_method: 聚合方法
store_ids: 选择的店铺列表
product_ids: 选择的药品列表
"""
try:
print(f"开始全局训练: model_type={model_type}, scope={training_scope}, aggregation={aggregation_method}")
from utils.multi_store_data_utils import aggregate_multi_store_data
import pandas as pd
# 读取数据
from utils.multi_store_data_utils import load_multi_store_data
df = load_multi_store_data()
# 根据训练范围过滤数据
if training_scope == 'selected_stores' and store_ids:
df = df[df['store_id'].isin(store_ids)]
elif training_scope == 'selected_products' and product_ids:
df = df[df['product_id'].isin(product_ids)]
elif training_scope == 'custom' and store_ids and product_ids:
df = df[df['store_id'].isin(store_ids) & df['product_id'].isin(product_ids)]
if df.empty:
return {'error': '过滤后没有可用数据'}
# 获取可用的药品
available_products = df['product_id'].unique()
if len(available_products) == 0:
return {'error': '没有可用的药品数据'}
# 选择第一个药品进行全局训练(使用聚合数据)
first_product = available_products[0]
print(f"使用药品 {first_product} 进行全局模型训练")
# 使用PharmacyPredictor进行全局训练
predictor = PharmacyPredictor()
return predictor.train_model(
product_id=first_product,
model_type=model_type,
training_mode='global',
aggregation_method=aggregation_method,
epochs=epochs
)
except Exception as e:
print(f"全局训练失败: {str(e)}")
return {'error': str(e)}
# 注意: train_store_model 和 train_global_model 函数已被废弃。
# 所有训练逻辑已统一整合到 core.predictor.PharmacyPredictor 的 train_model 方法中,
# 通过 training_mode 参数 ('product', 'store', 'global') 进行分发。
# 这种重构确保了代码的单一职责和逻辑的集中管理。
# 初始化数据库
def init_db():
@ -1091,15 +957,10 @@ def start_training():
logger.info(f"📋 任务详情: 训练 {model_type} 模型 - {scope_msg}, 轮次: {epochs}")
# 根据训练模式生成版本号和模型标识
if training_mode == 'product':
model_identifier = product_id
version = get_next_model_version(product_id, model_type) if version is None else version
elif training_mode == 'store':
model_identifier = f"store_{store_id}"
version = get_next_model_version(f"store_{store_id}", model_type) if version is None else version
elif training_mode == 'global':
model_identifier = "global"
version = get_next_model_version("global", model_type) if version is None else version
# v2版模型标识符的生成已移至 core.predictor.py此处不再需要
# 版本号的生成已移至 utils.model_manager.py此处不再需要
model_identifier = "deprecated"
version = "deprecated"
thread_safe_print(f"🏷️ 版本信息: 版本号 {version}, 模型标识: {model_identifier}", "[VERSION]")
logger.info(f"🏷️ 版本信息: 版本号 {version}, 模型标识: {model_identifier}")
@ -1169,25 +1030,8 @@ def start_training():
thread_safe_print(f"✅ 训练器返回结果: {type(metrics)}", "[RESULT]")
logger.info(f"✅ 训练器返回结果: {type(metrics)}")
elif training_mode == 'store':
# 按店铺训练 - 需要新的训练逻辑
metrics = train_store_model(
store_id=store_id,
model_type=model_type,
epochs=epochs,
product_scope=kwargs.get('product_scope', 'all'),
product_ids=kwargs.get('product_ids', [])
)
elif training_mode == 'global':
# 全局训练 - 需要新的训练逻辑
metrics = train_global_model(
model_type=model_type,
epochs=epochs,
training_scope=kwargs.get('training_scope', 'all_stores_all_products'),
aggregation_method=kwargs.get('aggregation_method', 'sum'),
store_ids=kwargs.get('store_ids', []),
product_ids=kwargs.get('product_ids', [])
)
# 注意: training_mode 的分发逻辑已移至 core.predictor.py
# 此处的 elif training_mode == 'store' 和 'global' 分支已废弃
thread_safe_print(f"📈 训练完成! 结果类型: {type(metrics)}", "[COMPLETE]")
if metrics:
@ -1519,19 +1363,17 @@ def predict():
product_id = data.get('product_id')
store_id = data.get('store_id')
# v2版根据训练模式和ID构建模型标识符
aggregation_method = data.get('aggregation_method', 'sum') # 全局模式需要
if training_mode == 'global':
# 全局模式:使用硬编码的标识符,并为预测函数设置占位符
model_identifier = "global_all_products_sum"
product_id = 'all_products'
product_name = "全局聚合数据"
model_identifier = f"global_{aggregation_method}"
product_name = f"全局聚合数据 ({aggregation_method})"
elif training_mode == 'store':
# 店铺模式验证store_id并构建标识符
if not store_id:
return jsonify({"status": "error", "error": "店铺模式需要 store_id"}), 400
model_identifier = f"store_{store_id}"
product_name = f"店铺 {store_id} 整体"
else: # 默认为 'product' 模式
# 药品模式验证product_id并构建标识符
if not product_id:
return jsonify({"status": "error", "error": "药品模式需要 product_id"}), 400
model_identifier = product_id
@ -1549,15 +1391,41 @@ def predict():
if not version:
return jsonify({"status": "error", "error": f"未找到标识符为 {model_identifier}{model_type} 类型模型"}), 404
# 检查模型文件是否存在
model_file_path = get_model_file_path(model_identifier, model_type, version)
if not os.path.exists(model_file_path):
return jsonify({"status": "error", "error": f"未找到模型文件: {model_file_path}"}), 404
# v2版使用 ModelManager 查找模型文件,不再使用旧的 get_model_file_path
from utils.model_manager import model_manager
# 智能修正 training_mode (兼容前端可能发送的错误模式)
if model_identifier.startswith('store_'):
training_mode = 'store'
store_id = model_identifier.split('_')[1]
elif model_identifier.startswith('global_'):
training_mode = 'global'
# 使用 model_manager 查找模型
models_result = model_manager.list_models(
model_type=model_type,
store_id=store_id if training_mode == 'store' else None,
product_id=product_id if training_mode == 'product' else None,
training_mode=training_mode
)
found_model = None
for model in models_result.get('models', []):
if model.get('version') == version:
found_model = model
break
if not found_model or not found_model.get('file_path'):
error_msg = f"在系统中未找到匹配的模型: mode={training_mode}, type={model_type}, id='{model_identifier}', version={version}"
print(error_msg)
return jsonify({"status": "error", "error": error_msg}), 404
model_file_path = found_model['file_path']
model_id = f"{model_identifier}_{model_type}_{version}"
# 执行预测
prediction_result = run_prediction(model_type, product_id, model_id, future_days, start_date, version, store_id, training_mode)
# 执行预测 (v2版传递 model_file_path)
prediction_result = run_prediction(model_type, product_id, model_id, future_days, start_date, version, store_id, training_mode, model_file_path)
if prediction_result is None:
return jsonify({"status": "error", "error": "模型文件未找到或加载失败"}), 404
@ -2716,13 +2584,16 @@ def get_product_name(product_id):
print(f"获取产品名称失败: {str(e)}")
return None
# 执行预测的辅助函数
def run_prediction(model_type, product_id, model_id, future_days, start_date, version=None, store_id=None, training_mode='product'):
# 执行预测的辅助函数 (v2版)
def run_prediction(model_type, product_id, model_id, future_days, start_date, version=None, store_id=None, training_mode='product', model_path=None):
"""执行模型预测"""
try:
scope_msg = f", store_id={store_id}" if store_id else ", 全局模型"
print(f"开始运行预测: model_type={model_type}, product_id={product_id}, model_id={model_id}, version={version}{scope_msg}")
if not model_path:
raise ValueError("run_prediction v2版需要一个明确的 model_path。")
# 创建预测器实例
predictor = PharmacyPredictor()
@ -2731,15 +2602,17 @@ def run_prediction(model_type, product_id, model_id, future_days, start_date, ve
if model_type == 'optimized_kan':
predictor_model_type = 'optimized_kan'
# 生成预测
prediction_result = predictor.predict(
# 生成预测 (v2版直接调用 load_model_and_predict)
prediction_result = load_model_and_predict(
model_path=model_path,
product_id=product_id,
model_type=predictor_model_type,
store_id=store_id,
future_days=future_days,
start_date=start_date,
version=version,
training_mode=training_mode
training_mode=training_mode,
analyze_result=True # 默认进行分析
)
if prediction_result is None:
@ -3805,11 +3678,19 @@ def get_model_versions_api(product_id, model_type):
@app.route('/api/models/store/<store_id>/<model_type>/versions', methods=['GET'])
def get_store_model_versions_api(store_id, model_type):
"""获取店铺模型版本列表API"""
"""获取店铺模型版本列表API (v2版使用ModelManager)"""
try:
model_identifier = f"store_{store_id}"
versions = get_model_versions(model_identifier, model_type)
latest_version = get_latest_model_version(model_identifier, model_type)
from utils.model_manager import model_manager
result = model_manager.list_models(
store_id=store_id,
model_type=model_type,
training_mode='store'
)
models = result.get('models', [])
versions = sorted(list(set(m['version'] for m in models)), key=lambda v: (v != 'best', v))
latest_version = versions[0] if versions else None
return jsonify({
"status": "success",
@ -3826,18 +3707,28 @@ def get_store_model_versions_api(store_id, model_type):
@app.route('/api/models/global/<model_type>/versions', methods=['GET'])
def get_global_model_versions_api(model_type):
"""获取全局模型版本列表API"""
"""获取全局模型版本列表API (v2版使用ModelManager)"""
try:
# 全局模型的标识符是在训练时确定的,例如 'global_all_products_sum'
# 这里我们假设前端请求的是默认的全局模型
model_identifier = "global_all_products_sum"
versions = get_model_versions(model_identifier, model_type)
latest_version = get_latest_model_version(model_identifier, model_type)
from utils.model_manager import model_manager
aggregation_method = request.args.get('aggregation_method')
result = model_manager.list_models(
model_type=model_type,
training_mode='global'
)
models = result.get('models', [])
if aggregation_method:
models = [m for m in models if m.get('aggregation_method') == aggregation_method]
versions = sorted(list(set(m['version'] for m in models)), key=lambda v: (v != 'best', v))
latest_version = versions[0] if versions else None
return jsonify({
"status": "success",
"data": {
"model_type": model_type,
"aggregation_method": aggregation_method,
"versions": versions,
"latest_version": latest_version
}

View File

@ -177,12 +177,14 @@ class PharmacyPredictor:
log_message(f"不支持的训练模式: {training_mode}", 'error')
return None
# 根据训练模式构建模型标识符
# 根据训练模式构建模型标识符 (v2 修正)
if training_mode == 'store':
model_identifier = product_id
# 店铺模型的标识符只应基于店铺ID
model_identifier = f"store_{store_id}"
elif training_mode == 'global':
model_identifier = f"global_{product_id}_{aggregation_method}"
else:
# 全局模型的标识符不应依赖于单个product_id
model_identifier = f"global_{aggregation_method}"
else: # product mode
model_identifier = product_id
# 调用相应的训练函数
@ -190,8 +192,8 @@ class PharmacyPredictor:
log_message(f"🤖 开始调用 {model_type} 训练器")
if model_type == 'transformer':
model_result, metrics, actual_version = train_product_model_with_transformer(
product_id=product_id,
model_identifier=model_identifier,
product_id=product_id, # product_id 仍然需要,用于数据过滤
model_identifier=model_identifier, # 这是用于保存模型的唯一ID
product_df=product_data,
store_id=store_id,
training_mode=training_mode,
@ -209,7 +211,7 @@ class PharmacyPredictor:
elif model_type == 'mlstm':
_, metrics, _, _ = train_product_model_with_mlstm(
product_id=product_id,
model_identifier=model_identifier,
model_identifier=model_identifier, # 传递修正后的ID
product_df=product_data,
store_id=store_id,
training_mode=training_mode,
@ -225,7 +227,7 @@ class PharmacyPredictor:
elif model_type == 'kan':
_, metrics = train_product_model_with_kan(
product_id=product_id,
model_identifier=model_identifier,
model_identifier=model_identifier, # 传递修正后的ID
product_df=product_data,
store_id=store_id,
training_mode=training_mode,
@ -239,7 +241,7 @@ class PharmacyPredictor:
elif model_type == 'optimized_kan':
_, metrics = train_product_model_with_kan(
product_id=product_id,
model_identifier=model_identifier,
model_identifier=model_identifier, # 传递修正后的ID
product_df=product_data,
store_id=store_id,
training_mode=training_mode,
@ -253,7 +255,7 @@ class PharmacyPredictor:
elif model_type == 'tcn':
_, metrics, _, _ = train_product_model_with_tcn(
product_id=product_id,
model_identifier=model_identifier,
model_identifier=model_identifier, # 传递修正后的ID
product_df=product_data,
store_id=store_id,
training_mode=training_mode,
@ -311,13 +313,13 @@ class PharmacyPredictor:
返回:
预测结果和分析如果analyze_result为True
"""
# 根据训练模式构建模型标识符
# 根据训练模式构建模型标识符 (v2 修正)
if training_mode == 'store' and store_id:
# 修正店铺模型的标识符应该只基于店铺ID
model_identifier = f"store_{store_id}"
elif training_mode == 'global':
model_identifier = f"global_{product_id}_{aggregation_method}"
else:
# 全局模型的标识符不应依赖于单个product_id
model_identifier = f"global_{aggregation_method}"
else: # product mode
model_identifier = product_id
return load_model_and_predict(

View File

@ -10,6 +10,7 @@ from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import sklearn.preprocessing._data # 添加这一行以支持MinMaxScaler的反序列化
from typing import Optional
from models.transformer_model import TimeSeriesTransformer
from models.slstm_model import sLSTM as ScalarLSTM
@ -23,77 +24,26 @@ from utils.visualization import plot_prediction_results
from utils.multi_store_data_utils import get_store_product_sales_data, aggregate_multi_store_data
from core.config import DEVICE, get_model_file_path, DEFAULT_DATA_PATH
def load_model_and_predict(product_id, model_type, store_id=None, future_days=7, start_date=None, analyze_result=False, version=None, training_mode='product'):
def load_model_and_predict(model_path: str, product_id: str, model_type: str, store_id: Optional[str] = None, future_days: int = 7, start_date: Optional[str] = None, analyze_result: bool = False, version: Optional[str] = None, training_mode: str = 'product'):
"""
加载已训练的模型并进行预测
加载已训练的模型并进行预测 (v2版)
参数:
model_path: 模型的准确文件路径
product_id: 产品ID
model_type: 模型类型 ('transformer', 'mlstm', 'kan', 'tcn', 'optimized_kan')
store_id: 店铺ID为None时使用全局模型
model_type: 模型类型
store_id: 店铺ID
future_days: 预测未来天数
start_date: 预测起始日期如果为None则使用最后一个已知日期
start_date: 预测起始日期
analyze_result: 是否分析预测结果
version: 模型版本如果为None则使用最新版本
version: 模型版本
training_mode: 训练模式
返回:
预测结果和分析如果analyze_result为True
预测结果和分析
"""
try:
# 确定模型文件路径(支持多店铺)
model_path = None
if version:
# 使用版本管理系统获取正确的文件路径
model_path = get_model_file_path(product_id, model_type, version)
else:
# 根据store_id确定搜索目录
if store_id:
# 查找特定店铺的模型
possible_dirs = [
os.path.join('saved_models', model_type, store_id),
os.path.join('models', model_type, store_id)
]
else:
# 查找全局模型
possible_dirs = [
os.path.join('saved_models', model_type, 'global'),
os.path.join('models', model_type, 'global'),
os.path.join('saved_models', model_type), # 后向兼容
'saved_models' # 最基本的目录
]
# 文件名模式
model_suffix = '_optimized' if model_type == 'optimized_kan' else ''
file_model_type = 'kan' if model_type == 'optimized_kan' else model_type
possible_names = [
f"{product_id}_{model_type}_v1_model.pt", # 新多店铺格式
f"{product_id}_{model_type}_v1_global_model.pt", # 全局模型格式
f"{product_id}_{model_type}_v1.pth", # 旧版本格式
f"{file_model_type}{model_suffix}_model_product_{product_id}.pth", # 原始格式
f"{model_type}_model_product_{product_id}.pth" # 简化格式
]
# 搜索模型文件
for dir_path in possible_dirs:
if not os.path.exists(dir_path):
continue
for name in possible_names:
test_path = os.path.join(dir_path, name)
if os.path.exists(test_path):
model_path = test_path
break
if model_path:
break
if not model_path:
scope_msg = f"店铺 {store_id}" if store_id else "全局"
print(f"找不到产品 {product_id}{model_type} 模型文件 ({scope_msg})")
print(f"搜索目录: {possible_dirs}")
return None
print(f"尝试加载模型文件: {model_path}")
print(f"v2版预测函数启动直接使用模型路径: {model_path}")
if not os.path.exists(model_path):
print(f"模型文件 {model_path} 不存在")

View File

@ -255,7 +255,7 @@ def train_product_model_with_kan(product_id, model_identifier, product_df=None,
from utils.model_manager import model_manager
model_manager.save_model(
model_data=best_model_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type=model_type_name,
store_id=store_id,
training_mode=training_mode,
@ -338,7 +338,7 @@ def train_product_model_with_kan(product_id, model_identifier, product_df=None,
# 保存最终模型,让 model_manager 自动处理版本号
final_model_path, final_version = model_manager.save_model(
model_data=model_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type=model_type_name,
store_id=store_id,
training_mode=training_mode,

View File

@ -367,7 +367,7 @@ def train_product_model_with_mlstm(
best_loss = test_loss
model_manager.save_model(
model_data=checkpoint_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='mlstm',
store_id=store_id,
training_mode=training_mode,
@ -491,7 +491,7 @@ def train_product_model_with_mlstm(
# 保存最终模型,让 model_manager 自动处理版本号
final_model_path, final_version = model_manager.save_model(
model_data=final_model_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='mlstm',
store_id=store_id,
training_mode=training_mode,

View File

@ -271,7 +271,7 @@ def train_product_model_with_tcn(
best_loss = test_loss
model_manager.save_model(
model_data=checkpoint_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='tcn',
store_id=store_id,
training_mode=training_mode,
@ -356,7 +356,7 @@ def train_product_model_with_tcn(
final_model_path, final_version = model_manager.save_model(
model_data=final_model_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='tcn',
store_id=store_id,
training_mode=training_mode,

View File

@ -291,7 +291,7 @@ def train_product_model_with_transformer(
best_loss = test_loss
model_manager.save_model(
model_data=checkpoint_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='transformer',
store_id=store_id,
training_mode=training_mode,
@ -382,7 +382,7 @@ def train_product_model_with_transformer(
final_model_path, final_version = model_manager.save_model(
model_data=final_model_data,
product_id=model_identifier,
product_id=model_identifier, # 修正:使用唯一的标识符
model_type='transformer',
store_id=store_id,
training_mode=training_mode,

View File

@ -25,14 +25,15 @@ class ModelManager:
if not os.path.exists(self.model_dir):
os.makedirs(self.model_dir)
def _get_next_version(self, product_id: str, model_type: str, store_id: Optional[str] = None, training_mode: str = 'product') -> int:
def _get_next_version(self, model_type: str, product_id: Optional[str] = None, store_id: Optional[str] = None, training_mode: str = 'product', aggregation_method: Optional[str] = None) -> int:
"""获取下一个模型版本号 (纯数字)"""
search_pattern = self.generate_model_filename(
product_id=product_id,
model_type=model_type,
version='v*',
product_id=product_id,
store_id=store_id,
training_mode=training_mode
training_mode=training_mode,
aggregation_method=aggregation_method
)
full_search_path = os.path.join(self.model_dir, search_pattern)
@ -47,27 +48,29 @@ class ModelManager:
return max_version + 1
def generate_model_filename(self,
product_id: str,
model_type: str,
version: str,
store_id: Optional[str] = None,
training_mode: str = 'product',
product_id: Optional[str] = None,
store_id: Optional[str] = None,
aggregation_method: Optional[str] = None) -> str:
"""
生成统一的模型文件名
格式规范:
格式规范 (v2):
- 产品模式: {model_type}_product_{product_id}_{version}.pth
- 店铺模式: {model_type}_store_{store_id}_{product_id}_{version}.pth
- 全局模式: {model_type}_global_{product_id}_{aggregation_method}_{version}.pth
- 店铺模式: {model_type}_store_{store_id}_{version}.pth
- 全局模式: {model_type}_global_{aggregation_method}_{version}.pth
"""
if training_mode == 'store' and store_id:
return f"{model_type}_store_{store_id}_{product_id}_{version}.pth"
return f"{model_type}_store_{store_id}_{version}.pth"
elif training_mode == 'global' and aggregation_method:
return f"{model_type}_global_{product_id}_{aggregation_method}_{version}.pth"
else:
# 默认产品模式
return f"{model_type}_global_{aggregation_method}_{version}.pth"
elif training_mode == 'product' and product_id:
return f"{model_type}_product_{product_id}_{version}.pth"
else:
# 提供一个后备或抛出错误,以避免生成无效文件名
raise ValueError(f"无法为训练模式 '{training_mode}' 生成有效的文件名缺少必需的ID。")
def save_model(self,
model_data: dict,
@ -89,13 +92,24 @@ class ModelManager:
(模型文件路径, 使用的版本号)
"""
if version is None:
next_version_num = self._get_next_version(product_id, model_type, store_id, training_mode)
next_version_num = self._get_next_version(
model_type=model_type,
product_id=product_id,
store_id=store_id,
training_mode=training_mode,
aggregation_method=aggregation_method
)
version_str = f"v{next_version_num}"
else:
version_str = version
filename = self.generate_model_filename(
product_id, model_type, version_str, store_id, training_mode, aggregation_method
model_type=model_type,
version=version_str,
training_mode=training_mode,
product_id=product_id,
store_id=store_id,
aggregation_method=aggregation_method
)
# 统一保存到根目录,避免复杂的子目录结构
@ -250,127 +264,65 @@ class ModelManager:
def parse_model_filename(self, filename: str) -> Optional[Dict]:
"""
解析模型文件名提取模型信息
解析模型文件名提取模型信息 (v2版)
支持的格式:
- {model_type}_product_{product_id}_{version}.pth
- {model_type}_store_{store_id}_{product_id}_{version}.pth
- {model_type}_global_{product_id}_{aggregation_method}_{version}.pth
- 旧格式兼容
- 产品: {model_type}_product_{product_id}_{version}.pth
- 店铺: {model_type}_store_{store_id}_{version}.pth
- 全局: {model_type}_global_{aggregation_method}_{version}.pth
"""
if not filename.endswith('.pth'):
return None
base_name = filename.replace('.pth', '')
parts = base_name.split('_')
if len(parts) < 3:
return None # 格式不符合基本要求
model_type = parts[0]
mode = parts[1]
try:
# 新格式解析
if '_product_' in base_name:
# 产品模式: model_type_product_product_id_version
parts = base_name.split('_product_')
model_type = parts[0]
rest = parts[1]
# 分离产品ID和版本
if '_v' in rest:
last_v_index = rest.rfind('_v')
product_id = rest[:last_v_index]
version = rest[last_v_index+1:]
else:
product_id = rest
version = 'v1'
if mode == 'store' and len(parts) >= 3:
# {model_type}_store_{store_id}_{version}
version = parts[-1]
store_id = '_'.join(parts[2:-1])
return {
'model_type': model_type,
'training_mode': 'store',
'store_id': store_id,
'version': version,
'product_id': None,
'aggregation_method': None
}
elif mode == 'global' and len(parts) >= 3:
# {model_type}_global_{aggregation_method}_{version}
version = parts[-1]
aggregation_method = '_'.join(parts[2:-1])
return {
'model_type': model_type,
'training_mode': 'global',
'aggregation_method': aggregation_method,
'version': version,
'product_id': None,
'store_id': None
}
elif mode == 'product' and len(parts) >= 3:
# {model_type}_product_{product_id}_{version}
version = parts[-1]
product_id = '_'.join(parts[2:-1])
return {
'model_type': model_type,
'training_mode': 'product',
'product_id': product_id,
'version': version,
'training_mode': 'product',
'store_id': None,
'aggregation_method': None
}
elif '_store_' in base_name:
# 店铺模式: model_type_store_store_id_product_id_version
parts = base_name.split('_store_')
model_type = parts[0]
rest = parts[1]
# 分离店铺ID、产品ID和版本
rest_parts = rest.split('_')
if len(rest_parts) >= 3:
store_id = rest_parts[0]
if rest_parts[-1].startswith('v'):
# 最后一部分是版本号
version = rest_parts[-1]
product_id = '_'.join(rest_parts[1:-1])
else:
version = 'v1'
product_id = '_'.join(rest_parts[1:])
return {
'model_type': model_type,
'product_id': product_id,
'version': version,
'training_mode': 'store',
'store_id': store_id,
'aggregation_method': None
}
elif '_global_' in base_name:
# 全局模式: model_type_global_product_id_aggregation_method_version
parts = base_name.split('_global_')
model_type = parts[0]
rest = parts[1]
rest_parts = rest.split('_')
if len(rest_parts) >= 3:
if rest_parts[-1].startswith('v'):
# 最后一部分是版本号
version = rest_parts[-1]
aggregation_method = rest_parts[-2]
product_id = '_'.join(rest_parts[:-2])
else:
version = 'v1'
aggregation_method = rest_parts[-1]
product_id = '_'.join(rest_parts[:-1])
return {
'model_type': model_type,
'product_id': product_id,
'version': version,
'training_mode': 'global',
'store_id': None,
'aggregation_method': aggregation_method
}
# 兼容旧格式
else:
# 尝试解析其他格式
if 'model_product_' in base_name:
parts = base_name.split('_model_product_')
model_type = parts[0]
product_part = parts[1]
if '_v' in product_part:
last_v_index = product_part.rfind('_v')
product_id = product_part[:last_v_index]
version = product_part[last_v_index+1:]
else:
product_id = product_part
version = 'v1'
return {
'model_type': model_type,
'product_id': product_id,
'version': version,
'training_mode': 'product',
'store_id': None,
'aggregation_method': None
}
except Exception as e:
print(f"解析文件名失败 {filename}: {e}")
print(f"解析新版v2文件名失败 {filename}: {e}")
return None
def delete_model(self, model_file: str) -> bool:

View File

@ -397,68 +397,70 @@ npm run dev
至此,一个完整的“训练->预测->展示”的调用链路就完成了。
## 5. 模型保存规则与路径
## 5. 模型保存与版本管理核心逻辑 (重构后)
为了确保模型的唯一性、可追溯性和可复现性,系统采用了一套严格的文件保存和命名规则。所有相关的逻辑都集中在 [`server/core/config.py`](server/core/config.py:1) 中。
为了根治版本混乱和模型加载失败的问题,系统进行了一项重要的重构。现在,所有与模型保存、命名和版本管理相关的逻辑都已**统一集中**到 [`server/utils/model_manager.py`](server/utils/model_manager.py:1) 的 `ModelManager`中。
### 5.1. 统一保存目录
### 5.1. 统一管理者:`ModelManager`
所有训练产物包括模型权重、配置和数据缩放器Scalers都保存在项目根目录下的 `saved_models/` 文件夹中。
- **单一职责**: `ModelManager` 是系统中唯一负责处理模型文件IO的组件。所有训练器 (`trainer`) 在需要保存模型时,都必须通过它来进行。
- **核心功能**:
1. **自动版本控制**: 自动生成和递增符合规范的版本号。
2. **统一命名**: 根据模型的元数据算法类型、训练模式、ID等生成标准化的文件名。
3. **安全保存**: 将模型数据和元数据一起打包保存到 `.pth` 文件中。
4. **可靠检索**: 提供统一的接口来列出和查找模型。
- **路径**: `PROJECT_ROOT/saved_models/`
- **定义**: 该路径由 [`server/core/config.py`](server/core/config.py:1) 中的 `DEFAULT_MODEL_DIR` 变量指定。
### 5.2. 统一版本规范
### 5.2. 文件命名规范
所有模型版本现在都遵循一个严格的、可预测的格式:
模型文件的命名遵循一个标准化的格式,以便在预测时能够被精确地定位和加载。该命名逻辑由 [`get_model_file_path()`](server/core/config.py:136) 函数统一管理。
- **数字版本**: `v{数字}`,例如 `v1`, `v2`, `v3`...
- **生成**: 当一次训练**正常完成**时,`ModelManager` 会自动计算出当前模型的下一个可用版本号(例如,如果已存在 `v1``v2`,则新版本为 `v3`),并以此命名最终的模型文件。
- **用途**: 代表一次完整的、稳定的训练产出。
- **特殊版本**: `best`
- **生成**: 在训练过程中,如果某个 `epoch` 产生的模型在验证集上的性能超过了之前所有 `epoch`,训练器会调用 `ModelManager` 将这个模型保存为 `best` 版本,覆盖掉旧的 `best` 模型。
- **用途**: 始终指向该模型迄今为止性能最佳的一个版本,便于快速进行高质量的预测。
**命名格式**: `{model_type}_{model_identifier}_epoch_{version}.pth`
### 5.3. 统一命名约定 (v2版)
**各部分说明**:
随着系统增加了“按店铺”和“全局”训练模式,`ModelManager``generate_model_filename` 方法也已升级,以支持更丰富的、无歧义的命名格式:
- `{model_type}`: 模型的算法类型。例如:`transformer`, `mlstm`, `tcn`, `kan`
- `{model_identifier}`: 模型的唯一业务标识符,它根据训练模式(`training_mode`)动态生成:
- **按药品训练 (`product`)**: 标识符就是 `product_id`
- *示例*: `transformer_17002608_epoch_best.pth`
- **按店铺训练 (`store`)**: 标识符是 `store_{store_id}`
- *示例*: `tcn_store_01010023_epoch_best.pth`
- **全局训练 (`global`)**: 标识符是固定的字符串 `'global'`
- *示例*: `mlstm_global_epoch_best.pth`
- `{version}`: 模型的版本。在训练过程中,通常会保存两个版本:
- `best`: 在验证集上表现最佳的模型。
- `{epoch_number}`: 训练完成时的最终模型,例如 `50`
前端的“版本”下拉框中显示的就是这些版本字符串。
- **药品模型**: `{model_type}_product_{product_id}_{version}.pth`
- *示例*: `transformer_product_17002608_best.pth`
- **店铺模型**: `{model_type}_store_{store_id}_{version}.pth`
- *示例*: `mlstm_store_01010023_v2.pth`
- **全局模型**: `{model_type}_global_{aggregation_method}_{version}.pth`
- *示例*: `tcn_global_sum_v1.pth`
### 5.3. Checkpoint文件内容
这个新的命名系统确保了不同训练模式产出的模型可以清晰地被识别和管理。
每个 `.pth` 文件都是一个PyTorch Checkpoint它是一个Python字典包含了重建和使用模型所需的所有信息。这是确保预测与训练环境一致的关键。
### 5.4. Checkpoint文件内容 (结构不变)
**Checkpoint结构**:
每个 `.pth` 文件依然是一个包含模型权重、完整配置和数据缩放器的PyTorch Checkpoint。重构加强了**所有训练器都必须将完整的配置信息存入 `config` 字典**这一规则,确保了模型的完全可复现性。
```python
checkpoint = {
# 1. 模型权重
'model_state_dict': model.state_dict(),
# 2. 完整的模型配置
'config': {
'input_dim': ...,
'hidden_size': ...,
'num_layers': ...,
'model_type': 'transformer',
# ... 其他所有重建模型所需的超参数 ...
},
# 3. 数据归一化缩放器
'scaler_X': scaler_X, # 用于输入特征
'scaler_y': scaler_y, # 用于目标值(销量)
# 4. (可选) 模型性能指标
'metrics': {'mse': 0.01, 'mae': 0.05, ...}
}
```
### 5.5. 核心优势 (重构后)
**核心优势**:
- **逻辑集中**: 所有版本管理的复杂性都被封装在 `ModelManager` 内部,训练器只需调用 `save_model` 即可,无需关心版本号如何生成。
- **数据一致性**: 由于版本的生成、保存和检索都由同一个组件以同一种逻辑处理,从根本上杜绝了因命名或版本格式不匹配导致“模型未找到”的问题。
- **易于维护**: 未来如果需要修改版本策略或命名规则,只需修改 `ModelManager` 一个文件即可,无需改动所有训练器。
- **可复现性**: 通过保存完整的 `config`我们可以在预测时精确地重建出与训练时结构完全相同的模型实例避免了因模型结构不匹配导致的加载失败这是之前修复的一个核心BUG
- **数据一致性**: 保存 `scaler_X``scaler_y` 确保了在预测时使用与训练时完全相同的归一化/反归一化逻辑,保证了预测结果的正确性。
## 6. 核心流程的演进:支持店铺与全局模式
在最初的“按药品”流程基础上系统已重构以支持“按店铺”和“全局”的完整AI闭环。这引入了一些关键的逻辑变化
### 6.1. 训练流程的变化
- **统一入口**: 所有训练请求(药品、店铺、全局)都通过 `POST /api/training` 接口,由 `training_mode` 参数区分。
- **数据聚合**: 在 [`predictor.py`](server/core/predictor.py:1) 的 `train_model` 方法中,会根据 `training_mode` 调用 `aggregate_multi_store_data` 函数,为店铺或全局模式准备正确的聚合时间序列数据。
- **模型标识符**: `train_model` 方法现在会生成一个唯一的 `model_identifier`(例如 `product_17002608`, `store_01010023`, `global_sum`),并将其传递给所有下游训练器。这是确保模型被正确命名的关键。
### 6.2. 预测流程的重大修复
预测流程经过了重大修复,以解决之前因逻辑不统一导致的 `404` 错误。
- **废弃旧函数**: `core/config.py` 中的 `get_model_file_path``get_model_versions` 等旧的、有缺陷的辅助函数已被**完全废弃**。
- **统一查找逻辑**: 现在,[`api.py`](server/api.py:1) 的 `predict` 函数**必须**使用 `model_manager.list_models()` 方法来查找模型。
- **可靠的路径传递**: `predict` 函数找到正确的模型文件路径后,会将其作为一个参数,一路传递给 `run_prediction` 和最终的 `load_model_and_predict` 函数。
- **根除缺陷**: `load_model_and_predict` 函数内部所有手动的、过时的文件查找逻辑已被**完全移除**。它现在只负责接收一个明确的路径并加载模型。
这个修复确保了整个预测链路都依赖于 `ModelManager` 这一个“单一事实来源”,从根本上解决了因路径不匹配导致的预测失败问题。

View File

@ -73,7 +73,7 @@
3. **业务逻辑层**: `api.py` 调用 `core/predictor.py` 中的 `predict` 方法,将参数传递下去。这一层是业务的“调度中心”。
4. **模型层**: `core/predictor.py` 最终调用 `predictors/model_predictor.py` 中的 `load_model_and_predict` 函数。
5. **模型加载与执行**:
* 根据参数在 `saved_models/` 目录下找到对应的模型文件(例如 `transformer_17002608_epoch_best.pth`)。
* 根据参数在 `saved_models/` 目录下找到对应的模型文件(例如 `transformer_store_01010023_best.pth` 或 `mlstm_product_17002608_v3.pth`)。
* 加载文件,从中恢复出 **模型结构**、**模型权重** 和 **数据缩放器**
* 准备最新的历史数据作为输入,执行预测。
* 将预测结果返回。