From 28bae357835a5015dedd5b8952baccb28d0064e2 Mon Sep 17 00:00:00 2001 From: xz2000 Date: Mon, 21 Jul 2025 16:38:36 +0800 Subject: [PATCH] =?UTF-8?q?#=20=E6=89=81=E5=B9=B3=E5=8C=96=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E8=A7=84=E8=8C=83?= =?UTF-8?q?=20(=E6=9C=80=E7=BB=88=E7=89=88)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **版本**: 4.0 (最终版) **核心思想**: 逻辑路径被转换为文件名的一部分,实现极致扁平化的文件存储。 --- ## 一、 文件保存规则 ### 1.1. 核心原则 所有元数据都被编码到文件名中。一个逻辑上的层级路径(例如 `product/P001_all/mlstm/v2`)应该被转换为一个用下划线连接的文件名前缀(`product_P001_all_mlstm_v2`)。 ### 1.2. 文件存储位置 - **最终产物**: 所有最终模型、元数据文件、损失图等,统一存放在 `saved_models/` 根目录下。 - **过程文件**: 所有训练过程中的检查点文件,统一存放在 `saved_models/checkpoints/` 目录下。 ### 1.3. 文件名生成规则 1. **构建逻辑路径**: 根据训练参数(模式、范围、类型、版本)确定逻辑路径。 - *示例*: `product/P001_all/mlstm/v2` 2. **生成文件名前缀**: 将逻辑路径中的所有 `/` 替换为 `_`。 - *示例*: `product_P001_all_mlstm_v2` 3. **拼接文件后缀**: 在前缀后加上描述文件类型的后缀。 - `_model.pth` - `_metadata.json` - `_loss_curve.png` - `_checkpoint_best.pth` - `_checkpoint_epoch_{N}.pth` #### **完整示例:** - **最终模型**: `saved_models/product_P001_all_mlstm_v2_model.pth` - **元数据**: `saved_models/product_P001_all_mlstm_v2_metadata.json` - **最佳检查点**: `saved_models/checkpoints/product_P001_all_mlstm_v2_checkpoint_best.pth` - **Epoch 50 检查点**: `saved_models/checkpoints/product_P001_all_mlstm_v2_checkpoint_epoch_50.pth` --- ## 二、 文件读取规则 1. **确定模型元数据**: 根据需求确定要加载的模型的训练模式、范围、类型和版本。 2. **构建文件名前缀**: 按照与保存时相同的逻辑,将元数据拼接成文件名前缀(例如 `product_P001_all_mlstm_v2`)。 3. **定位文件**: - 要加载最终模型,查找文件: `saved_models/{prefix}_model.pth`。 - 要加载最佳检查点,查找文件: `saved_models/checkpoints/{prefix}_checkpoint_best.pth`。 --- ## 三、 数据库存储规则 数据库用于索引,应存储足以重构文件名前缀的关键元数据。 #### **`models` 表结构建议:** | 字段名 | 类型 | 描述 | 示例 | | :--- | :--- | :--- | :--- | | `id` | INTEGER | 主键 | 1 | | `filename_prefix` | TEXT | **完整文件名前缀,可作为唯一标识** | `product_P001_all_mlstm_v2` | | `model_identifier`| TEXT | 用于版本控制的标识符 (不含版本) | `product_P001_all_mlstm` | | `version` | INTEGER | 版本号 | `2` | | `status` | TEXT | 模型状态 | `completed`, `training`, `failed` | | `created_at` | TEXT | 创建时间 | `2025-07-21 02:29:00` | | `metrics_summary`| TEXT | 关键性能指标的JSON字符串 | `{"rmse": 10.5, "r2": 0.89}` | #### **保存逻辑:** - 训练完成后,向表中插入一条记录。`filename_prefix` 字段是查找与该次训练相关的所有文件的关键。 --- ## 四、 版本记录规则 版本管理依赖于根目录下的 `versions.json` 文件,以实现原子化、线程安全的版本号递增。 - **文件名**: `versions.json` - **位置**: `saved_models/versions.json` - **结构**: 一个JSON对象,`key` 是不包含版本号的标识符,`value` 是该标识符下最新的版本号(整数)。 - **Key**: `{prefix_core}_{model_type}` (例如: `product_P001_all_mlstm`) - **Value**: `Integer` #### **`versions.json` 示例:** ```json { "product_P001_all_mlstm": 2, "store_S001_P002_transformer": 1 } ``` #### **版本管理流程:** 1. **获取新版本**: 开始训练前,构建 `key`。读取 `versions.json`,找到对应 `key` 的 `value`。新版本号为 `value + 1` (若key不存在,则为 `1`)。 2. **更新版本**: 训练成功后,将新的版本号写回到 `versions.json`。此过程**必须使用文件锁**以防止并发冲突。 调试完成药品预测和店铺预测 --- UI/src/components/ProductSelector.vue | 4 +- UI/src/components/StoreSelector.vue | 4 +- .../views/prediction/GlobalPredictionView.vue | 4 +- .../prediction/ProductPredictionView.vue | 13 +- .../views/prediction/StorePredictionView.vue | 16 +- UI/src/views/training/GlobalTrainingView.vue | 4 +- UI/src/views/training/ProductTrainingView.vue | 4 +- UI/src/views/training/StoreTrainingView.vue | 4 +- prediction_history.db | Bin 36864 -> 36864 bytes server/api.py | 375 +++++----- server/core/config.py | 218 +----- server/core/predictor.py | 28 +- server/models/model_manager.py | 696 +----------------- server/predictors/model_predictor.py | 96 +-- server/trainers/kan_trainer.py | 15 +- server/trainers/mlstm_trainer.py | 84 +-- server/trainers/tcn_trainer.py | 33 +- server/trainers/transformer_trainer.py | 76 +- server/utils/data_utils.py | 2 +- server/utils/file_save.py | 128 ++-- server/utils/model_manager.py | 20 +- server/utils/multi_store_data_utils.py | 36 +- server/utils/training_process_manager.py | 77 +- xz训练模型保存规则.md | 209 ++---- 24 files changed, 609 insertions(+), 1537 deletions(-) diff --git a/UI/src/components/ProductSelector.vue b/UI/src/components/ProductSelector.vue index 84cd8f6..7e69938 100644 --- a/UI/src/components/ProductSelector.vue +++ b/UI/src/components/ProductSelector.vue @@ -242,7 +242,7 @@ watch(() => props.storeId, () => { .product-name { font-weight: 500; - color: #303133; + color: var(--el-text-color-primary); margin-bottom: 2px; } @@ -254,7 +254,7 @@ watch(() => props.storeId, () => { } .product-id { - color: #909399; + color: var(--el-text-color-secondary); margin-right: 8px; } diff --git a/UI/src/components/StoreSelector.vue b/UI/src/components/StoreSelector.vue index 2935ee8..1853d55 100644 --- a/UI/src/components/StoreSelector.vue +++ b/UI/src/components/StoreSelector.vue @@ -242,7 +242,7 @@ watch(() => props.filterByStatus, () => { .store-name { font-weight: 500; - color: #303133; + color: var(--el-text-color-primary); margin-bottom: 2px; } @@ -254,7 +254,7 @@ watch(() => props.filterByStatus, () => { } .store-location { - color: #909399; + color: var(--el-text-color-secondary); margin-right: 8px; } diff --git a/UI/src/views/prediction/GlobalPredictionView.vue b/UI/src/views/prediction/GlobalPredictionView.vue index 6d9cb88..eda995e 100644 --- a/UI/src/views/prediction/GlobalPredictionView.vue +++ b/UI/src/views/prediction/GlobalPredictionView.vue @@ -191,7 +191,7 @@ const startPrediction = async () => { start_date: form.start_date, analyze_result: form.analyze_result } - const response = await axios.post('/api/predict', payload) + const response = await axios.post('/api/prediction', payload) if (response.data.status === 'success') { predictionResult.value = response.data.data ElMessage.success('预测完成!') @@ -213,7 +213,7 @@ const renderChart = () => { chart.destroy() } const predictions = predictionResult.value.predictions - const labels = predictions.map(p => p.date) + const labels = predictions.map(p => new Date(p.date).toLocaleDateString('zh-CN', { weekday: 'short', year: 'numeric', month: 'long', day: 'numeric' })) const data = predictions.map(p => p.sales) chart = new Chart(chartCanvas.value, { type: 'line', diff --git a/UI/src/views/prediction/ProductPredictionView.vue b/UI/src/views/prediction/ProductPredictionView.vue index 17c977d..b02ae3d 100644 --- a/UI/src/views/prediction/ProductPredictionView.vue +++ b/UI/src/views/prediction/ProductPredictionView.vue @@ -200,6 +200,15 @@ const handleModelTypeChange = () => { } const startPrediction = async () => { + if (!form.product_id) { + ElMessage.error('请选择目标药品') + return + } + if (!form.model_type) { + ElMessage.error('请选择算法类型') + return + } + try { predicting.value = true const payload = { @@ -210,7 +219,7 @@ const startPrediction = async () => { analyze_result: form.analyze_result, product_id: form.product_id } - const response = await axios.post('/api/predict', payload) + const response = await axios.post('/api/prediction', payload) if (response.data.status === 'success') { predictionResult.value = response.data.data ElMessage.success('预测完成!') @@ -232,7 +241,7 @@ const renderChart = () => { chart.destroy() } const predictions = predictionResult.value.predictions - const labels = predictions.map(p => p.date) + const labels = predictions.map(p => new Date(p.date).toLocaleDateString('zh-CN', { weekday: 'short', year: 'numeric', month: 'long', day: 'numeric' })) const data = predictions.map(p => p.sales) chart = new Chart(chartCanvas.value, { type: 'line', diff --git a/UI/src/views/prediction/StorePredictionView.vue b/UI/src/views/prediction/StorePredictionView.vue index 692a104..4c6b8aa 100644 --- a/UI/src/views/prediction/StorePredictionView.vue +++ b/UI/src/views/prediction/StorePredictionView.vue @@ -200,6 +200,15 @@ const handleModelTypeChange = () => { } const startPrediction = async () => { + if (!form.store_id) { + ElMessage.error('请选择目标店铺') + return + } + if (!form.model_type) { + ElMessage.error('请选择算法类型') + return + } + try { predicting.value = true const payload = { @@ -208,9 +217,10 @@ const startPrediction = async () => { future_days: form.future_days, start_date: form.start_date, analyze_result: form.analyze_result, - store_id: form.store_id + store_id: form.store_id, + training_mode: form.training_mode } - const response = await axios.post('/api/predict', payload) + const response = await axios.post('/api/prediction', payload) if (response.data.status === 'success') { predictionResult.value = response.data.data ElMessage.success('预测完成!') @@ -232,7 +242,7 @@ const renderChart = () => { chart.destroy() } const predictions = predictionResult.value.predictions - const labels = predictions.map(p => p.date) + const labels = predictions.map(p => new Date(p.date).toLocaleDateString('zh-CN', { weekday: 'short', year: 'numeric', month: 'long', day: 'numeric' })) const data = predictions.map(p => p.sales) chart = new Chart(chartCanvas.value, { type: 'line', diff --git a/UI/src/views/training/GlobalTrainingView.vue b/UI/src/views/training/GlobalTrainingView.vue index 98db680..caefef1 100644 --- a/UI/src/views/training/GlobalTrainingView.vue +++ b/UI/src/views/training/GlobalTrainingView.vue @@ -271,11 +271,11 @@

评估指标

{{ JSON.stringify(row.metrics, null, 2) }}
-
+

错误信息

diff --git a/UI/src/views/training/ProductTrainingView.vue b/UI/src/views/training/ProductTrainingView.vue index d7321b3..460bf3c 100644 --- a/UI/src/views/training/ProductTrainingView.vue +++ b/UI/src/views/training/ProductTrainingView.vue @@ -240,11 +240,11 @@

评估指标

{{ JSON.stringify(row.metrics, null, 2) }}
-
+

错误信息

diff --git a/UI/src/views/training/StoreTrainingView.vue b/UI/src/views/training/StoreTrainingView.vue index c5d1666..356884a 100644 --- a/UI/src/views/training/StoreTrainingView.vue +++ b/UI/src/views/training/StoreTrainingView.vue @@ -255,11 +255,11 @@

评估指标

{{ JSON.stringify(row.metrics, null, 2) }}
-
+

错误信息

diff --git a/prediction_history.db b/prediction_history.db index 70679ea09a7bf433ed2703892b5a161859fc2626..b8e736cc60e96510e35d5ac9714d20f10888ad40 100644 GIT binary patch literal 36864 zcmeI4&u=41702!Ta(>wz2@T=LE;3r7O|&Ufzq-51NI;l1t7!9MChKTtBvw{+cQs?k zBr~=XZNw>J7Z7O0azcRM0B1A@R!FR1IqV<635he>S>}K^bEMj7+i_>w>133VW_g`h zX}eyP-}m!gRdrRze(mOZv!l!5e(SF4lv!>em(S`a|)?+}p)Z&3-()W?p~;2mk>f00e*l5C8%|U^Ic_ z&rTN>zqXt|e!aOD=r^0en~l5sf!=K#>Q=kCzt?UkztcR_!!OQU->$9JYvr}gmut7m zsg=sJWpiw8mWOLE-D~X!2Y#o~3|8Wz&e1)+Qf~I%ZMj?V_;ZEA;;Y8DjWpkS4Mo+5 z-(&hb4^LOUn4ht8&rKB;Yb1ZX)pzb*O9xFuYk#lNYrX5#sn_o`+nxQ^QQ?etr)-Am z_FX$=5I2&7@x;CF&rK8-mzVQz-|Q%FHvwlD)nx2Uef1mbwM3}OOWjCBVQdB_-1XWk zwe9ka?X`{7?c3#7Yq!g*JN2!#P1D3iZL_}8x9*8Bwd%E7_44MHdF-sO4=hfch$5F} z5vO2UaoF6|jeDwdC(XcbY1Pp|!>mHme9)a%({Fc+2cGrQurK)K+Ba8s*6U?5FxuH% zySYJ?LS`K?mEut-S$hp%+oJeyi)Ul|)}`E2H zYvX33nT{r#qIoRj8ab;_DiyEIKR5S<*)PqkOtV6DYIU+Uu{ORjR?puu2A4aI4;L14 zJI`;t@Y)yit5FljpLdBXx#S#qwih5SBu9#HjkJqhj6EqSCB&Rj2aAv1|JlQLe*Ujt z{N>R*4<7#LR}X*s;L&^UEq(OCUq62L{fEE#$#SQq_S(_jg>LndsaLh3c6S@6${U9^ z#neXFMU)f^38b*{ky4HhDAytkP4gEP9?em@fr6#Mmygz- z#s3rShoxlDsT9PaiX!d?7;!=fQo;*S;0Yfq<$Asw^gMq`LC2)ZoC+0`b2?gh7U$!j z7n4B`rH>sdY(zubHbHkel3wT|rW}S{M%_S*O9+3Wp!2HDt4vfJW^*b>YtQ0;tbIPI zol))uuFZ{DVY1#t7|B2?gt_f&FT^hO@KtFiDGAT3+^KR=5mYdnjnbH3OO@DcU)yg zcx=Wy&r!DPOBOhn8R5^L+R}wHGqsTlAr2R0bi}hb9}AyO3MVYIu|onxq;e3ac7UX! z3OSTe$BeLM@K=|}_9qceMHM?0CZ)NI7%e=D^Re(kQn;rXH_Pvj8rJ)jGWWQvp63MpGpcBLQBkihHU0_XLSb&$HgcJ17Asr11zql zkF!A^pC6poi4iWUlvJ=|3wF+=mc{uK;oqA)5&lO1DlWAB!1m2ZuFd>yQeBw|o%jyY z4rkc(b|Khh2KFh{HaS47)9S%1G!UA)U}n z>5hjy&O(##n0Q<>-{)78?<2zQRynIM*CxUlEj)|!vGDPvaEJR^*b*b^VzX-T%@w@# zqpMh3DccJ@Eag+rcel!DB}zAfM+?v5d@Ot{DLfRm9bhSt_FRE@Xq%4CiG!%;v4DAj zS+9jxJ;I$$b+cUKn3(n2XyI9$kA>%x!igEfc<3qQyGA|dVTjBsB{XxI>G_`LNk{Rk z7Cy9AGb5Z+Ww;`3bAfkG#Ircx6+Ua$lw0=;$ zZ_Sn7wYc@5`0vXt|DhHT00KY&2mk>f00e*l5C8%|;QyGw()dn3%U#5U$S}(-H!CvC za*L`I8D_bkQ;ZC=-29l2471!Rn2QXv?5xj5hFNx^XS#2lVcU5+GR(5mS%?g?>+Eg2M_=PKmZ5;0U!Vb zfB+Bx0zd!=00AKIj1%~Dew^e_{Udw)*?@&;{ck=100$5N0zd!=00AHX1b_e#00KY& z2mk>f@Cham{r^9#|3AS225CS52mk>f00e*l5C8%|00;m9AOHjmf$r!3OXl-Xth9nsT0t?bU_Py2F0EiTtzagtU^=a!kXA62Rxp`XFp*X; zo>nlnXt2*b20PsUzew_eEf4?#KmZ5;0U!VbfB+Bx0zd!=0D)(d0IdI?&A5TrKmZ5; Y0U!VbfB+Bx0zd!=00AHX1TI40f4?4fi~s-t delta 40 ucmZozz|^pSX@WE(>qHr6M%Il9OZYdlIQ)@c//versions', methods=['GET']) def get_store_model_versions_api(store_id, model_type): - """获取店铺模型版本列表API""" + """获取店铺模型版本列表API - 匹配所有药品范围""" try: - model_identifier = f"store_{store_id}" - versions = get_model_versions(model_identifier, model_type) - latest_version = get_latest_model_version(model_identifier, model_type) + import re + pattern = re.compile(rf"store_{store_id}_.*_{model_type}_v\d+_model\.pth") + model_files = os.listdir(DEFAULT_MODEL_DIR) + + found_versions = [] + for filename in model_files: + if pattern.match(filename): + # 提取文件名前缀作为版本标识 + version_prefix = filename.replace("_model.pth", "") + found_versions.append(version_prefix) + + # 按版本号(数字部分)降序排序 + if found_versions: + sorted_versions = sorted( + found_versions, + key=lambda s: int(s.split('_v')[-1]), + reverse=True + ) + else: + sorted_versions = [] + + latest_version = sorted_versions[0] if sorted_versions else None + return jsonify({ "status": "success", "data": { "store_id": store_id, "model_type": model_type, - "versions": versions, + "versions": sorted_versions, "latest_version": latest_version } }) except Exception as e: - print(f"获取店铺模型版本失败: {str(e)}") + logger.error(f"获取店铺模型版本失败: {str(e)}") + import traceback + traceback.print_exc() return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/models/global//versions', methods=['GET']) def get_global_model_versions_api(model_type): """获取全局模型版本列表API""" try: - model_identifier = "global" - versions = get_model_versions(model_identifier, model_type) - latest_version = get_latest_model_version(model_identifier, model_type) - + # TODO: 此处需要更新为从数据库或文件系统扫描获取版本信息 + # 暂时返回模拟数据 return jsonify({ "status": "success", "data": { "model_type": model_type, - "versions": versions, - "latest_version": latest_version + "versions": ["v1"], + "latest_version": "v1" } }) except Exception as e: @@ -3896,8 +3894,8 @@ def retrain_model(): else: return jsonify({'error': '无效的训练模式'}), 400 - # 生成新版本号 - new_version = get_next_model_version(model_identifier, model_type) + # 生成新版本号 - 此逻辑已废弃,应通过 path_manager 处理 + new_version = "v_retrain_unknown" # 生成任务ID task_id = str(uuid.uuid4()) @@ -4634,3 +4632,4 @@ def test_models_fix(): "message": str(e), "test_name": "ModelManager修复测试" }), 500 + diff --git a/server/core/config.py b/server/core/config.py index 0eefa17..c9d8c7a 100644 --- a/server/core/config.py +++ b/server/core/config.py @@ -36,7 +36,7 @@ DEVICE = get_device() # 使用 os.path.join 构造跨平台的路径 DEFAULT_DATA_PATH = os.path.join(PROJECT_ROOT, 'data', 'timeseries_training_data_sample_10s50p.parquet') DEFAULT_MODEL_DIR = os.path.join(PROJECT_ROOT, 'saved_models') -DEFAULT_FEATURES = ['sales', 'price', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] +DEFAULT_FEATURES = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] # 时间序列参数 LOOK_BACK = 5 # 使用过去5天数据(适应小数据集) @@ -71,216 +71,6 @@ TRAINING_UPDATE_INTERVAL = 1 # 训练进度更新间隔(秒) # 创建模型保存目录 os.makedirs(DEFAULT_MODEL_DIR, exist_ok=True) -def get_next_model_version(product_id: str, model_type: str) -> str: - """ - 获取指定产品和模型类型的下一个版本号 - - Args: - product_id: 产品ID - model_type: 模型类型 - - Returns: - 下一个版本号,格式如 'v2', 'v3' 等 - """ - # 新格式:带版本号的文件 - pattern_new = f"{model_type}_model_product_{product_id}_v*.pth" - existing_files_new = glob.glob(os.path.join(DEFAULT_MODEL_DIR, pattern_new)) - - # 旧格式:不带版本号的文件(兼容性支持) - pattern_old = f"{model_type}_model_product_{product_id}.pth" - old_file_path = os.path.join(DEFAULT_MODEL_DIR, pattern_old) - has_old_format = os.path.exists(old_file_path) - - # 如果没有任何格式的文件,返回默认版本 - if not existing_files_new and not has_old_format: - return DEFAULT_VERSION - - # 提取新格式文件的版本号 - versions = [] - for file_path in existing_files_new: - filename = os.path.basename(file_path) - version_match = re.search(rf"_v(\d+)\.pth$", filename) - if version_match: - versions.append(int(version_match.group(1))) - - # 如果存在旧格式文件,将其视为v1 - if has_old_format: - versions.append(1) - print(f"检测到旧格式模型文件: {old_file_path},将其视为版本v1") - - if versions: - next_version_num = max(versions) + 1 - return f"v{next_version_num}" - else: - return DEFAULT_VERSION - -def get_model_file_path(product_id: str, model_type: str, version: str = None) -> str: - """ - 生成模型文件路径 - - Args: - product_id: 产品ID - model_type: 模型类型 - version: 版本号,如果为None则获取下一个版本 - - Returns: - 模型文件的完整路径 - """ - if version is None: - version = get_next_model_version(product_id, model_type) - - # 特殊处理v1版本:检查是否存在旧格式文件 - if version == "v1": - # 检查旧格式文件是否存在 - old_format_filename = f"{model_type}_model_product_{product_id}.pth" - old_format_path = os.path.join(DEFAULT_MODEL_DIR, old_format_filename) - - if os.path.exists(old_format_path): - print(f"找到旧格式模型文件: {old_format_path},将其作为v1版本") - return old_format_path - - # 使用新格式文件名 - filename = f"{model_type}_model_product_{product_id}_{version}.pth" - return os.path.join(DEFAULT_MODEL_DIR, filename) - -def get_model_versions(product_id: str, model_type: str) -> list: - """ - 获取指定产品和模型类型的所有版本 - - Args: - product_id: 产品ID - model_type: 模型类型 - - Returns: - 版本列表,按版本号排序 - """ - # 新格式:带版本号的文件 - pattern_new = f"{model_type}_model_product_{product_id}_v*.pth" - existing_files_new = glob.glob(os.path.join(DEFAULT_MODEL_DIR, pattern_new)) - - # 旧格式:不带版本号的文件(兼容性支持) - pattern_old = f"{model_type}_model_product_{product_id}.pth" - old_file_path = os.path.join(DEFAULT_MODEL_DIR, pattern_old) - has_old_format = os.path.exists(old_file_path) - - versions = [] - - # 处理新格式文件 - for file_path in existing_files_new: - filename = os.path.basename(file_path) - version_match = re.search(rf"_v(\d+)\.pth$", filename) - if version_match: - version_num = int(version_match.group(1)) - versions.append(f"v{version_num}") - - # 如果存在旧格式文件,将其视为v1 - if has_old_format: - if "v1" not in versions: # 避免重复添加 - versions.append("v1") - print(f"检测到旧格式模型文件: {old_file_path},将其视为版本v1") - - # 按版本号排序 - versions.sort(key=lambda v: int(v[1:])) - return versions - -def get_latest_model_version(product_id: str, model_type: str) -> str: - """ - 获取指定产品和模型类型的最新版本 - - Args: - product_id: 产品ID - model_type: 模型类型 - - Returns: - 最新版本号,如果没有则返回None - """ - versions = get_model_versions(product_id, model_type) - return versions[-1] if versions else None - -def save_model_version_info(product_id: str, model_type: str, version: str, file_path: str, metrics: dict = None): - """ - 保存模型版本信息到数据库 - - Args: - product_id: 产品ID - model_type: 模型类型 - version: 版本号 - file_path: 模型文件路径 - metrics: 模型性能指标 - """ - import sqlite3 - import json - from datetime import datetime - - try: - conn = sqlite3.connect('prediction_history.db') - cursor = conn.cursor() - - # 插入模型版本记录 - cursor.execute(''' - INSERT INTO model_versions ( - product_id, model_type, version, file_path, created_at, metrics, is_active - ) VALUES (?, ?, ?, ?, ?, ?, ?) - ''', ( - product_id, - model_type, - version, - file_path, - datetime.now().isoformat(), - json.dumps(metrics) if metrics else None, - 1 # 新模型默认为激活状态 - )) - - conn.commit() - conn.close() - print(f"已保存模型版本信息: {product_id}_{model_type}_{version}") - - except Exception as e: - print(f"保存模型版本信息失败: {str(e)}") - -def get_model_version_info(product_id: str, model_type: str, version: str = None): - """ - 从数据库获取模型版本信息 - - Args: - product_id: 产品ID - model_type: 模型类型 - version: 版本号,如果为None则获取最新版本 - - Returns: - 模型版本信息字典 - """ - import sqlite3 - import json - - try: - conn = sqlite3.connect('prediction_history.db') - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - if version: - cursor.execute(''' - SELECT * FROM model_versions - WHERE product_id = ? AND model_type = ? AND version = ? - ORDER BY created_at DESC LIMIT 1 - ''', (product_id, model_type, version)) - else: - cursor.execute(''' - SELECT * FROM model_versions - WHERE product_id = ? AND model_type = ? - ORDER BY created_at DESC LIMIT 1 - ''', (product_id, model_type)) - - row = cursor.fetchone() - conn.close() - - if row: - result = dict(row) - if result['metrics']: - result['metrics'] = json.loads(result['metrics']) - return result - return None - - except Exception as e: - print(f"获取模型版本信息失败: {str(e)}") - return None \ No newline at end of file +# 注意:所有与模型路径、版本管理相关的函数(如 get_next_model_version, get_model_file_path 等) +# 已被移除,因为这些功能现在由 server.utils.file_save.ModelPathManager 统一处理。 +# 这种集中化管理确保了整个应用程序遵循统一的、基于规范的扁平化文件保存策略。 \ No newline at end of file diff --git a/server/core/predictor.py b/server/core/predictor.py index 2dbc3ff..b732c8f 100644 --- a/server/core/predictor.py +++ b/server/core/predictor.py @@ -64,6 +64,7 @@ class PharmacyPredictor: learning_rate=0.001, sequence_length=30, forecast_horizon=7, hidden_size=64, num_layers=2, dropout=0.1, use_optimized=False, store_id=None, training_mode='product', aggregation_method='sum', + product_scope='all', product_ids=None, socketio=None, task_id=None, version=None, continue_training=False, progress_callback=None, path_info=None): """ @@ -123,29 +124,38 @@ class PharmacyPredictor: return None # 如果product_id是'unknown',则表示为店铺所有商品训练一个聚合模型 - if product_id == 'unknown': + if product_scope == 'specific' and product_ids: + # 为店铺的指定产品列表训练 + try: + # 从该店铺的数据中筛选出指定的产品 + store_data = self.data[self.data['store_id'] == store_id] + product_data = store_data[store_data['product_id'].isin(product_ids)].copy() + log_message(f"按店铺-指定药品训练: 店铺 {store_id}, {len(product_ids)}种药品, 数据量: {len(product_data)}") + except Exception as e: + log_message(f"获取店铺指定药品数据失败: {e}", 'error') + return None + elif product_id == 'unknown' or product_scope == 'all': + # 为店铺所有商品训练一个聚合模型 try: - # 使用新的聚合函数,按店铺聚合 product_data = aggregate_multi_store_data( store_id=store_id, aggregation_method=aggregation_method, file_path=self.data_path ) log_message(f"按店铺聚合训练: 店铺 {store_id}, 聚合方法 {aggregation_method}, 数据量: {len(product_data)}") - # 将product_id设置为店铺ID,以便模型保存时使用有意义的标识 - product_id = store_id + product_id = store_id # 使用店铺ID作为模型标识 except Exception as e: log_message(f"聚合店铺 {store_id} 数据失败: {e}", 'error') return None else: - # 为店铺的单个特定产品训练 + # 为店铺的单个特定产品训练(兼容旧逻辑) try: product_data = get_store_product_sales_data( store_id=store_id, product_id=product_id, file_path=self.data_path ) - log_message(f"按店铺-产品训练: 店铺 {store_id}, 产品 {product_id}, 数据量: {len(product_data)}") + log_message(f"按店铺-单个产品训练: 店铺 {store_id}, 产品 {product_id}, 数据量: {len(product_data)}") except Exception as e: log_message(f"获取店铺产品数据失败: {e}", 'error') return None @@ -189,7 +199,7 @@ class PharmacyPredictor: try: log_message(f"🤖 开始调用 {model_type} 训练器") if model_type == 'transformer': - model_result, metrics, actual_version = train_product_model_with_transformer( + model_result, metrics, actual_version, _ = train_product_model_with_transformer( product_id=product_id, product_df=product_data, store_id=store_id, @@ -205,7 +215,7 @@ class PharmacyPredictor: ) log_message(f"✅ {model_type} 训练器返回: metrics={type(metrics)}, version={actual_version}", 'success') elif model_type == 'mlstm': - _, metrics, _, _ = train_product_model_with_mlstm( + model_result, metrics, _, _ = train_product_model_with_mlstm( product_id=product_id, product_df=product_data, store_id=store_id, @@ -241,7 +251,7 @@ class PharmacyPredictor: path_info=path_info ) elif model_type == 'tcn': - _, metrics, _, _ = train_product_model_with_tcn( + model_result, metrics, _, _ = train_product_model_with_tcn( product_id=product_id, product_df=product_data, store_id=store_id, diff --git a/server/models/model_manager.py b/server/models/model_manager.py index 0d7be5b..7a39541 100644 --- a/server/models/model_manager.py +++ b/server/models/model_manager.py @@ -14,695 +14,31 @@ from .kan_model import KANForecaster class ModelManager: """ - 模型管理类:负责模型的保存、加载、列出和删除等操作 + 模型管理类:此类现在主要负责提供模型类的映射。 + 注意:所有与文件系统交互的逻辑(保存、加载、删除等)已被移除, + 并由 server.utils.file_save.ModelPathManager 统一处理, + 以遵循新的扁平化文件存储规范。 """ - def __init__(self, models_dir='models'): + def __init__(self): """ 初始化模型管理器 - - 参数: - models_dir: 模型存储目录 """ - self.models_dir = models_dir - self._ensure_model_dir() - - # 模型类型映射 + # 模型类型到其对应类的映射 self.model_types = { 'mlstm': MLSTMTransformer, 'transformer': TimeSeriesTransformer, 'kan': KANForecaster } - - def _ensure_model_dir(self): - """确保模型目录存在""" - if not os.path.exists(self.models_dir): - try: - os.makedirs(self.models_dir, exist_ok=True) - print(f"创建模型目录: {os.path.abspath(self.models_dir)}") - except Exception as e: - print(f"创建模型目录失败: {str(e)}") - raise - - def save_model(self, model, model_type, product_id, optimizer=None, - train_loss=None, test_loss=None, scaler_X=None, - scaler_y=None, features=None, look_back=None, T=None, - metrics=None, version=None): + + def get_model_class(self, model_type: str): """ - 保存模型及其相关信息 - - 参数: - model: 训练好的模型 - model_type: 模型类型 ('mlstm', 'transformer', 'kan') - product_id: 产品ID - optimizer: 优化器 - train_loss: 训练损失历史 - test_loss: 测试损失历史 - scaler_X: 特征缩放器 - scaler_y: 目标缩放器 - features: 使用的特征列表 - look_back: 回看天数 - T: 预测天数 - metrics: 模型评估指标 - version: 模型版本(可选),如果不提供则使用时间戳 + 根据模型类型字符串获取模型类。 + + Args: + model_type (str): 模型类型 (e.g., 'mlstm', 'kan')。 + + Returns: + 模型类,如果不存在则返回 None。 """ - self._ensure_model_dir() - - # 设置版本 - if version is None: - version = datetime.now().strftime("%Y%m%d_%H%M%S") - - # 设置文件名 - model_filename = f"{product_id}_{model_type}_model_v{version}.pt" - model_path = os.path.join(self.models_dir, model_filename) - - # 准备要保存的数据 - save_dict = { - 'model_state_dict': model.state_dict(), - 'model_type': model_type, - 'product_id': product_id, - 'version': version, - 'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - 'features': features, - 'look_back': look_back, - 'T': T - } - - # 添加可选数据 - if optimizer is not None: - save_dict['optimizer_state_dict'] = optimizer.state_dict() - if train_loss is not None: - save_dict['train_loss'] = train_loss - if test_loss is not None: - save_dict['test_loss'] = test_loss - if scaler_X is not None: - save_dict['scaler_X'] = scaler_X - if scaler_y is not None: - save_dict['scaler_y'] = scaler_y - if metrics is not None: - save_dict['metrics'] = metrics - - try: - # 保存模型 - torch.save(save_dict, model_path) - print(f"模型已成功保存到 {os.path.abspath(model_path)}") - - # 保存模型的元数据到JSON文件,便于查询 - meta_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v{version}.json") - meta_dict = {k: str(v) if not isinstance(v, (int, float, bool, list, dict, type(None))) else v - for k, v in save_dict.items() if k != 'model_state_dict' and - k != 'optimizer_state_dict' and k != 'scaler_X' and k != 'scaler_y'} - - # 如果有评估指标,添加到元数据 - if metrics is not None: - meta_dict['metrics'] = metrics - - with open(meta_path, 'w') as f: - json.dump(meta_dict, f, indent=4) - - return model_path - except Exception as e: - print(f"保存模型时出错: {str(e)}") - raise - - def load_model(self, product_id, model_type='mlstm', version=None, device=None): - """ - 加载指定的模型 - - 参数: - product_id: 产品ID - model_type: 模型类型 ('mlstm', 'transformer', 'kan') - version: 模型版本,如果不指定则加载最新版本 - device: 设备 (cuda/cpu) - - 返回: - model: 加载的模型 - checkpoint: 包含模型信息的字典 - """ - if device is None: - device = get_device() - - # 查找匹配的模型文件 - if version is None: - # 查找最新版本 - pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v*.pt") - model_files = glob.glob(pattern) - - if not model_files: - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型文件") - return None, None - - # 按照文件修改时间排序,获取最新的 - model_path = max(model_files, key=os.path.getmtime) - else: - # 指定版本 - model_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v{version}.pt") - if not os.path.exists(model_path): - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型版本 {version}") - return None, None - - try: - # 加载模型 - checkpoint = torch.load(model_path, map_location=device) - - # 创建模型实例 - if model_type == 'mlstm': - model = MLSTMTransformer( - num_features=len(checkpoint['features']), - hidden_size=128, - mlstm_layers=1, - embed_dim=32, - dense_dim=32, - num_heads=4, - dropout_rate=0.1, - num_blocks=3, - output_sequence_length=checkpoint['T'] - ) - elif model_type == 'transformer': - model = TimeSeriesTransformer( - num_features=len(checkpoint['features']), - d_model=32, - nhead=4, - num_encoder_layers=3, - dim_feedforward=32, - dropout=0.1, - output_sequence_length=checkpoint['T'] - ) - elif model_type == 'kan': - model = KANForecaster( - input_features=len(checkpoint['features']), - hidden_sizes=[64, 128, 64], - output_size=1, - grid_size=5, - spline_order=3, - dropout_rate=0.1, - output_sequence_length=checkpoint['T'] - ) - else: - raise ValueError(f"不支持的模型类型: {model_type}") - - # 加载模型参数 - model.load_state_dict(checkpoint['model_state_dict']) - model = model.to(device) - model.eval() - - print(f"模型已从 {os.path.abspath(model_path)} 成功加载") - return model, checkpoint - except Exception as e: - print(f"加载模型时出错: {str(e)}") - raise - - def list_models(self, product_id=None, model_type=None): - """ - 列出所有保存的模型 - - 参数: - product_id: 按产品ID筛选 (可选) - model_type: 按模型类型筛选 (可选) - - 返回: - models_list: 模型信息列表 - """ - self._ensure_model_dir() - - # 构建搜索模式 - if product_id and model_type: - pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v*.pt") - elif product_id: - pattern = os.path.join(self.models_dir, f"{product_id}_*_model_v*.pt") - elif model_type: - pattern = os.path.join(self.models_dir, f"*_{model_type}_model_v*.pt") - else: - pattern = os.path.join(self.models_dir, "*_model_v*.pt") - - model_files = glob.glob(pattern) - - if not model_files: - print("未找到匹配的模型文件") - return [] - - # 收集模型信息 - models_list = [] - for model_path in model_files: - try: - # 从文件名解析信息 - filename = os.path.basename(model_path) - parts = filename.split('_') - if len(parts) < 4: - continue - - product_id = parts[0] - model_type = parts[1] - version = parts[-1].replace('model_v', '').replace('.pt', '') - - # 查找对应的元数据文件 - meta_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v{version}.json") - - model_info = { - 'product_id': product_id, - 'model_type': model_type, - 'version': version, - 'file_path': model_path, - 'created_at': datetime.fromtimestamp(os.path.getctime(model_path)).strftime("%Y-%m-%d %H:%M:%S"), - 'file_size': f"{os.path.getsize(model_path) / (1024 * 1024):.2f} MB" - } - - # 如果有元数据文件,添加更多信息 - if os.path.exists(meta_path): - with open(meta_path, 'r') as f: - meta = json.load(f) - model_info.update(meta) - - models_list.append(model_info) - except Exception as e: - print(f"解析模型文件 {model_path} 时出错: {str(e)}") - - # 按创建时间排序 - models_list.sort(key=lambda x: x['created_at'], reverse=True) - - return models_list - - def delete_model(self, product_id, model_type, version=None): - """ - 删除指定的模型 - - 参数: - product_id: 产品ID - model_type: 模型类型 - version: 模型版本,如果不指定则删除所有版本 - - 返回: - success: 是否成功删除 - """ - self._ensure_model_dir() - - if version: - # 删除特定版本 - model_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v{version}.pt") - meta_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v{version}.json") - - if not os.path.exists(model_path): - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型版本 {version}") - return False - - try: - os.remove(model_path) - if os.path.exists(meta_path): - os.remove(meta_path) - print(f"已删除产品 {product_id} 的 {model_type} 模型版本 {version}") - return True - except Exception as e: - print(f"删除模型时出错: {str(e)}") - return False - else: - # 删除所有版本 - pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v*.pt") - meta_pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v*.json") - - model_files = glob.glob(pattern) - meta_files = glob.glob(meta_pattern) - - if not model_files: - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型文件") - return False - - try: - for file_path in model_files: - os.remove(file_path) - - for file_path in meta_files: - os.remove(file_path) - - print(f"已删除产品 {product_id} 的所有 {model_type} 模型") - return True - except Exception as e: - print(f"删除模型时出错: {str(e)}") - return False - - def get_model_details(self, product_id, model_type, version=None): - """ - 获取模型的详细信息 - - 参数: - product_id: 产品ID - model_type: 模型类型 - version: 模型版本,如果不指定则获取最新版本 - - 返回: - details: 模型详细信息字典 - """ - # 查找匹配的模型文件 - if version is None: - # 查找最新版本 - pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v*.pt") - model_files = glob.glob(pattern) - - if not model_files: - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型文件") - return None - - # 按照文件修改时间排序,获取最新的 - model_path = max(model_files, key=os.path.getmtime) - # 从文件名解析版本 - filename = os.path.basename(model_path) - version = filename.split('_')[-1].replace('model_v', '').replace('.pt', '') - - # 查找元数据文件 - meta_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v{version}.json") - - if not os.path.exists(meta_path): - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型版本 {version} 的元数据") - return None - - try: - with open(meta_path, 'r') as f: - details = json.load(f) - - # 添加文件路径 - model_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v{version}.pt") - details['file_path'] = model_path - details['file_size'] = f"{os.path.getsize(model_path) / (1024 * 1024):.2f} MB" - - return details - except Exception as e: - print(f"获取模型详情时出错: {str(e)}") - return None - - def predict_with_model(self, product_id, model_type='mlstm', version=None, future_days=7, - product_df=None, features=None, visualize=True, save_results=True): - """ - 使用指定的模型进行预测 - - 参数: - product_id: 产品ID - model_type: 模型类型 ('mlstm', 'transformer', 'kan') - version: 模型版本,如果不指定则使用最新版本 - future_days: 要预测的未来天数 - product_df: 产品数据DataFrame - features: 特征列表 - visualize: 是否可视化结果 - save_results: 是否保存结果 - - 返回: - predictions_df: 预测结果DataFrame - """ - # 获取设备 - device = get_device() - print(f"使用设备: {device} 进行预测") - - # 加载模型 - model, checkpoint = self.load_model(product_id, model_type, version, device) - - if model is None or checkpoint is None: - return None - - # 如果没有提供产品数据,则从Excel文件加载 - if product_df is None: - try: - df = pd.read_excel('pharmacy_sales.xlsx') - product_df = df[df['product_id'] == product_id].sort_values('date') - except Exception as e: - print(f"加载产品数据时出错: {str(e)}") - return None - - product_name = product_df['product_name'].iloc[0] - - # 获取模型参数 - features = checkpoint['features'] - look_back = checkpoint['look_back'] - T = checkpoint['T'] - scaler_X = checkpoint['scaler_X'] - scaler_y = checkpoint['scaler_y'] - - # 获取最近的look_back天数据 - last_data = product_df[features].values[-look_back:] - last_data_scaled = scaler_X.transform(last_data) - - # 准备输入数据 - X_input = torch.Tensor(last_data_scaled).unsqueeze(0) # 添加批次维度 - X_input = X_input.to(device) # 移动到设备上 - - # 进行预测 - with torch.no_grad(): - y_pred_scaled = model(X_input).squeeze(0).cpu().numpy() # 返回到CPU并转换为numpy - - # 反归一化预测结果 - y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() - - # 创建预测日期范围 - last_date = product_df['date'].iloc[-1] - future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=T, freq='D') - - # 创建预测结果DataFrame - predictions_df = pd.DataFrame({ - 'date': future_dates, - 'product_id': product_id, - 'product_name': product_name, - 'predicted_sales': y_pred - }) - - print(f"\n{product_name} 未来 {T} 天销售预测 (使用{model_type.upper()}模型):") - print(predictions_df[['date', 'predicted_sales']]) - - # 可视化预测结果 - if visualize: - plt.figure(figsize=(12, 6)) - - # 显示历史数据和预测数据 - history_days = 30 # 显示最近30天的历史数据 - history_dates = product_df['date'].iloc[-history_days:].values - history_sales = product_df['sales'].iloc[-history_days:].values - - plt.plot(history_dates, history_sales, 'b-', label='历史销量') - plt.plot(future_dates, y_pred, 'r--', label=f'{model_type.upper()}预测销量') - - plt.title(f'{product_name} - {model_type.upper()}销量预测 (未来{T}天)') - plt.xlabel('日期') - plt.ylabel('销量') - plt.legend() - plt.grid(True) - plt.xticks(rotation=45) - plt.tight_layout() - - # 保存和显示图表 - forecast_chart = f'{product_id}_{model_type}_forecast.png' - plt.savefig(forecast_chart) - print(f"预测图表已保存为: {forecast_chart}") - - # 保存预测结果到CSV - if save_results: - forecast_csv = f'{product_id}_{model_type}_forecast.csv' - predictions_df.to_csv(forecast_csv, index=False) - print(f"预测结果已保存到: {forecast_csv}") - - return predictions_df - - def compare_models(self, product_id, model_types=None, versions=None, product_df=None, visualize=True): - """ - 比较不同模型的预测结果 - - 参数: - product_id: 产品ID - model_types: 要比较的模型类型列表 - versions: 对应的模型版本列表,如果不指定则使用最新版本 - product_df: 产品数据DataFrame - visualize: 是否可视化结果 - - 返回: - 比较结果DataFrame - """ - if model_types is None: - model_types = ['mlstm', 'transformer', 'kan'] - - if versions is None: - versions = [None] * len(model_types) - - if len(versions) != len(model_types): - print("错误: 模型类型和版本列表长度不匹配") - return None - - # 如果没有提供产品数据,则从Excel文件加载 - if product_df is None: - try: - df = pd.read_excel('pharmacy_sales.xlsx') - product_df = df[df['product_id'] == product_id].sort_values('date') - except Exception as e: - print(f"加载产品数据时出错: {str(e)}") - return None - - product_name = product_df['product_name'].iloc[0] - - # 存储所有模型的预测结果 - predictions = {} - - # 对每个模型进行预测 - for i, model_type in enumerate(model_types): - version = versions[i] - - try: - pred_df = self.predict_with_model( - product_id, - model_type=model_type, - version=version, - product_df=product_df, - visualize=False, - save_results=False - ) - - if pred_df is not None: - predictions[model_type] = pred_df - except Exception as e: - print(f"{model_type} 模型预测出错: {str(e)}") - - if not predictions: - print("没有成功的预测结果") - return None - - # 合并预测结果 - result_df = predictions[list(predictions.keys())[0]][['date', 'product_id', 'product_name']].copy() - - for model_type, pred_df in predictions.items(): - result_df[f'{model_type}_prediction'] = pred_df['predicted_sales'].values - - # 可视化比较结果 - if visualize and len(predictions) > 0: - plt.figure(figsize=(12, 6)) - - # 显示历史数据 - history_days = 30 # 显示最近30天的历史数据 - history_dates = product_df['date'].iloc[-history_days:].values - history_sales = product_df['sales'].iloc[-history_days:].values - - plt.plot(history_dates, history_sales, 'k-', label='历史销量') - - # 显示预测数据 - colors = ['r', 'g', 'b', 'c', 'm', 'y'] - future_dates = result_df['date'].values - - for i, (model_type, pred_df) in enumerate(predictions.items()): - color = colors[i % len(colors)] - plt.plot(future_dates, pred_df['predicted_sales'].values, - f'{color}--', label=f'{model_type.upper()}预测') - - plt.title(f'{product_name} - 不同模型预测结果比较') - plt.xlabel('日期') - plt.ylabel('销量') - plt.legend() - plt.grid(True) - plt.xticks(rotation=45) - plt.tight_layout() - - # 保存和显示图表 - compare_chart = f'{product_id}_model_comparison.png' - plt.savefig(compare_chart) - print(f"比较图表已保存为: {compare_chart}") - - # 保存比较结果到CSV - compare_csv = f'{product_id}_model_comparison.csv' - result_df.to_csv(compare_csv, index=False) - print(f"比较结果已保存到: {compare_csv}") - - return result_df - - def export_model(self, product_id, model_type, version=None, export_dir='exported_models'): - """ - 导出模型到指定目录 - - 参数: - product_id: 产品ID - model_type: 模型类型 - version: 模型版本,如果不指定则导出最新版本 - export_dir: 导出目录 - - 返回: - export_path: 导出的文件路径 - """ - # 确保导出目录存在 - if not os.path.exists(export_dir): - os.makedirs(export_dir, exist_ok=True) - - # 查找匹配的模型文件 - if version is None: - # 查找最新版本 - pattern = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v*.pt") - model_files = glob.glob(pattern) - - if not model_files: - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型文件") - return None - - # 按照文件修改时间排序,获取最新的 - model_path = max(model_files, key=os.path.getmtime) - # 从文件名解析版本 - filename = os.path.basename(model_path) - version = filename.split('_')[-1].replace('model_v', '').replace('.pt', '') - else: - model_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_model_v{version}.pt") - if not os.path.exists(model_path): - print(f"错误: 未找到产品 {product_id} 的 {model_type} 模型版本 {version}") - return None - - # 元数据文件 - meta_path = os.path.join(self.models_dir, f"{product_id}_{model_type}_meta_v{version}.json") - - # 导出路径 - export_model_path = os.path.join(export_dir, f"{product_id}_{model_type}_model_v{version}.pt") - export_meta_path = os.path.join(export_dir, f"{product_id}_{model_type}_meta_v{version}.json") - - try: - # 复制文件 - shutil.copy2(model_path, export_model_path) - if os.path.exists(meta_path): - shutil.copy2(meta_path, export_meta_path) - - print(f"模型已导出到 {os.path.abspath(export_model_path)}") - return export_model_path - except Exception as e: - print(f"导出模型时出错: {str(e)}") - return None - - def import_model(self, import_file, overwrite=False): - """ - 导入模型文件 - - 参数: - import_file: 要导入的模型文件路径 - overwrite: 如果存在同名文件是否覆盖 - - 返回: - import_path: 导入后的文件路径 - """ - self._ensure_model_dir() - - if not os.path.exists(import_file): - print(f"错误: 导入文件 {import_file} 不存在") - return None - - # 获取文件名 - filename = os.path.basename(import_file) - - # 目标路径 - target_path = os.path.join(self.models_dir, filename) - - # 检查是否存在同名文件 - if os.path.exists(target_path) and not overwrite: - print(f"错误: 目标文件 {target_path} 已存在,如需覆盖请设置overwrite=True") - return None - - try: - # 复制文件 - shutil.copy2(import_file, target_path) - - # 如果有对应的元数据文件,也一并导入 - meta_filename = filename.replace('_model_v', '_meta_v') - meta_import_file = import_file.replace('_model_v', '_meta_v').replace('.pt', '.json') - meta_target_path = os.path.join(self.models_dir, meta_filename.replace('.pt', '.json')) - - if os.path.exists(meta_import_file): - shutil.copy2(meta_import_file, meta_target_path) - - print(f"模型已导入到 {os.path.abspath(target_path)}") - return target_path - except Exception as e: - print(f"导入模型时出错: {str(e)}") - return None \ No newline at end of file + return self.model_types.get(model_type) \ No newline at end of file diff --git a/server/predictors/model_predictor.py b/server/predictors/model_predictor.py index cea7b8a..8acf361 100644 --- a/server/predictors/model_predictor.py +++ b/server/predictors/model_predictor.py @@ -21,82 +21,50 @@ from models.optimized_kan_forecaster import OptimizedKANForecaster from analysis.trend_analysis import analyze_prediction_result from utils.visualization import plot_prediction_results from utils.multi_store_data_utils import get_store_product_sales_data, aggregate_multi_store_data -from core.config import DEVICE, get_model_file_path +from core.config import DEVICE +from utils.file_save import ModelPathManager -def load_model_and_predict(product_id, model_type, store_id=None, future_days=7, start_date=None, analyze_result=False, version=None): +def load_model_and_predict(product_id, model_type, model_path=None, store_id=None, future_days=7, start_date=None, analyze_result=False, version=None, training_mode='product', **kwargs): """ 加载已训练的模型并进行预测 参数: product_id: 产品ID model_type: 模型类型 ('transformer', 'mlstm', 'kan', 'tcn', 'optimized_kan') + model_path: 模型的完整文件路径 store_id: 店铺ID,为None时使用全局模型 future_days: 预测未来天数 start_date: 预测起始日期,如果为None则使用最后一个已知日期 analyze_result: 是否分析预测结果 - version: 模型版本,如果为None则使用最新版本 + version: 模型版本 返回: 预测结果和分析(如果analyze_result为True) """ try: - # 确定模型文件路径(支持多店铺) - model_path = None - - if version: - # 使用版本管理系统获取正确的文件路径 - model_path = get_model_file_path(product_id, model_type, version) - else: - # 根据store_id确定搜索目录 - if store_id: - # 查找特定店铺的模型 - possible_dirs = [ - os.path.join('saved_models', model_type, store_id), - os.path.join('models', model_type, store_id) - ] - else: - # 查找全局模型 - possible_dirs = [ - os.path.join('saved_models', model_type, 'global'), - os.path.join('models', model_type, 'global'), - os.path.join('saved_models', model_type), # 后向兼容 - 'saved_models' # 最基本的目录 - ] - - # 文件名模式 - model_suffix = '_optimized' if model_type == 'optimized_kan' else '' - file_model_type = 'kan' if model_type == 'optimized_kan' else model_type - - possible_names = [ - f"{product_id}_{model_type}_v1_model.pt", # 新多店铺格式 - f"{product_id}_{model_type}_v1_global_model.pt", # 全局模型格式 - f"{product_id}_{model_type}_v1.pth", # 旧版本格式 - f"{file_model_type}{model_suffix}_model_product_{product_id}.pth", # 原始格式 - f"{model_type}_model_product_{product_id}.pth" # 简化格式 - ] - - # 搜索模型文件 - for dir_path in possible_dirs: - if not os.path.exists(dir_path): - continue - for name in possible_names: - test_path = os.path.join(dir_path, name) - if os.path.exists(test_path): - model_path = test_path - break - if model_path: - break - - if not model_path: - scope_msg = f"店铺 {store_id}" if store_id else "全局" - print(f"找不到产品 {product_id} 的 {model_type} 模型文件 ({scope_msg})") - print(f"搜索目录: {possible_dirs}") - return None - print(f"尝试加载模型文件: {model_path}") - if not os.path.exists(model_path): - print(f"模型文件 {model_path} 不存在") + # 如果没有提供 model_path,则使用 ModelPathManager 动态生成 + if not model_path: + if version is None: + raise ValueError("使用动态路径加载时必须提供 'version'。") + + path_manager = ModelPathManager() + # 传递所有必要的参数以重构路径 + path_params = { + 'product_id': product_id, + 'store_id': store_id, + **kwargs + } + model_path = path_manager.get_model_path_for_prediction( + training_mode=training_mode, + model_type=model_type, + version=version, + **path_params + ) + + if not model_path or not os.path.exists(model_path): + print(f"模型文件 {model_path} 不存在或无法生成。") return None # 加载销售数据(支持多店铺) @@ -104,9 +72,9 @@ def load_model_and_predict(product_id, model_type, store_id=None, future_days=7, if store_id: # 加载特定店铺的数据 product_df = get_store_product_sales_data( - store_id, + store_id, product_id, - 'pharmacy_sales_multi_store.csv' + None # 使用默认数据路径 ) store_name = product_df['store_name'].iloc[0] if 'store_name' in product_df.columns else f"店铺{store_id}" prediction_scope = f"店铺 '{store_name}' ({store_id})" @@ -115,14 +83,16 @@ def load_model_and_predict(product_id, model_type, store_id=None, future_days=7, product_df = aggregate_multi_store_data( product_id, aggregation_method='sum', - file_path='pharmacy_sales_multi_store.csv' + file_path=None # 使用默认数据路径 ) prediction_scope = "全部店铺(聚合数据)" except Exception as e: print(f"多店铺数据加载失败,尝试使用原始数据格式: {e}") # 后向兼容:尝试加载原始数据格式 try: - df = pd.read_excel('pharmacy_sales.xlsx') + from core.config import DEFAULT_DATA_PATH + from utils.multi_store_data_utils import load_multi_store_data + df = load_multi_store_data(DEFAULT_DATA_PATH) product_df = df[df['product_id'] == product_id].sort_values('date') if store_id: print(f"警告:原始数据不支持店铺过滤,将使用所有数据预测") @@ -262,7 +232,7 @@ def load_model_and_predict(product_id, model_type, store_id=None, future_days=7, # 准备输入数据 try: - features = ['sales', 'price', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] + features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] sequence_length = config['sequence_length'] # 获取最近的sequence_length天数据作为输入 diff --git a/server/trainers/kan_trainer.py b/server/trainers/kan_trainer.py index 7227d56..fe38512 100644 --- a/server/trainers/kan_trainer.py +++ b/server/trainers/kan_trainer.py @@ -103,7 +103,7 @@ def train_product_model_with_kan(product_id, product_df=None, store_id=None, tra print(f"使用{model_type}模型训练产品 '{product_name}' (ID: {product_id}) 的销售预测模型") print(f"训练范围: {training_scope}") print(f"使用设备: {DEVICE}") - print(f"模型将保存到目录: {path_info['version_dir']}") + print(f"模型将保存到: {path_info['base_dir']}") # 创建特征和目标变量 features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] @@ -297,9 +297,14 @@ def train_product_model_with_kan(product_id, product_df=None, store_id=None, tra 'loss_curve_path': loss_curve_path } - # 使用 path_info 中的路径保存模型 - model_path = path_info['model_path'] - torch.save(model_data, model_path) - print(f"模型已保存到: {model_path}") + # 检查模型性能是否达标 + # 移除R2检查,始终保存模型 + if metrics: + # 使用 path_info 中的路径保存模型 + model_path = path_info['model_path'] + torch.save(model_data, model_path) + print(f"模型已保存到: {model_path}") + else: + print(f"训练过程中未生成评估指标,不保存最终模型。") return model, metrics \ No newline at end of file diff --git a/server/trainers/mlstm_trainer.py b/server/trainers/mlstm_trainer.py index 151840a..a542c5c 100644 --- a/server/trainers/mlstm_trainer.py +++ b/server/trainers/mlstm_trainer.py @@ -20,24 +20,28 @@ from utils.multi_store_data_utils import get_store_product_sales_data, aggregate from utils.visualization import plot_loss_curve from analysis.metrics import evaluate_model from core.config import ( - DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON, - get_next_model_version, get_model_file_path, get_latest_model_version + DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON ) from utils.training_progress import progress_manager def save_checkpoint(checkpoint_data: dict, epoch_or_label, path_info: dict): """ - 保存训练检查点 + 保存训练检查点 (已适配扁平化路径规范) Args: checkpoint_data: 检查点数据 - epoch_or_label: epoch编号或标签(如'best', 'final', 50) + epoch_or_label: epoch编号或标签(如'best'或整数) path_info (dict): 包含所有路径信息的字典 """ if epoch_or_label == 'best': + # 使用由 ModelPathManager 直接提供的最佳检查点路径 checkpoint_path = path_info['best_checkpoint_path'] else: - checkpoint_path = os.path.join(path_info['checkpoint_dir'], f"checkpoint_epoch_{epoch_or_label}.pth") + # 使用 epoch 检查点模板生成路径 + template = path_info.get('epoch_checkpoint_template') + if not template: + raise ValueError("路径信息 'path_info' 中缺少 'epoch_checkpoint_template'。") + checkpoint_path = template.format(N=epoch_or_label) # 保存检查点 torch.save(checkpoint_data, checkpoint_path) @@ -45,49 +49,6 @@ def save_checkpoint(checkpoint_data: dict, epoch_or_label, path_info: dict): return checkpoint_path - -def load_checkpoint(product_id: str, model_type: str, epoch_or_label, - model_dir: str, store_id=None, training_mode: str = 'product', - aggregation_method=None): - """ - 加载训练检查点 - - Args: - product_id: 产品ID - model_type: 模型类型 - epoch_or_label: epoch编号或标签 - model_dir: 模型保存目录 - store_id: 店铺ID - training_mode: 训练模式 - aggregation_method: 聚合方法 - - Returns: - checkpoint_data: 检查点数据,如果未找到返回None - """ - checkpoint_dir = os.path.join(model_dir, 'checkpoints') - - # 生成检查点文件名 - if training_mode == 'store' and store_id: - filename = f"{model_type}_store_{store_id}_{product_id}_epoch_{epoch_or_label}.pth" - elif training_mode == 'global' and aggregation_method: - filename = f"{model_type}_global_{product_id}_{aggregation_method}_epoch_{epoch_or_label}.pth" - else: - filename = f"{model_type}_product_{product_id}_epoch_{epoch_or_label}.pth" - - checkpoint_path = os.path.join(checkpoint_dir, filename) - - if os.path.exists(checkpoint_path): - try: - checkpoint_data = torch.load(checkpoint_path, map_location=DEVICE) - print(f"[mLSTM] 检查点已加载: {checkpoint_path}", flush=True) - return checkpoint_data - except Exception as e: - print(f"[mLSTM] 加载检查点失败: {e}", flush=True) - return None - else: - print(f"[mLSTM] 检查点文件不存在: {checkpoint_path}", flush=True) - return None - def train_product_model_with_mlstm( product_id, product_df, @@ -217,7 +178,7 @@ def train_product_model_with_mlstm( print(f"[mLSTM] 训练范围: {training_scope}", flush=True) print(f"[mLSTM] 版本: v{version}", flush=True) print(f"[mLSTM] 使用设备: {DEVICE}", flush=True) - print(f"[mLSTM] 模型将保存到目录: {path_info['version_dir']}", flush=True) + print(f"[mLSTM] 模型将保存到: {path_info['base_dir']}", flush=True) print(f"[mLSTM] 数据量: {len(product_df)} 条记录", flush=True) emit_progress(f"训练产品: {product_name} (ID: {product_id}) - {training_scope}") @@ -535,11 +496,17 @@ def train_product_model_with_mlstm( } } - # 保存最终模型到 model.pth - final_model_path = path_info['model_path'] - torch.save(final_model_data, final_model_path) - print(f"[mLSTM] 最终模型已保存: {final_model_path}", flush=True) - + # 检查模型性能是否达标 + # 移除R2检查,始终保存模型 + if metrics: + # 保存最终模型到 model.pth + final_model_path = path_info['model_path'] + torch.save(final_model_data, final_model_path) + print(f"[mLSTM] 最终模型已保存: {final_model_path}", flush=True) + else: + final_model_path = None + print(f"[mLSTM] 训练过程中未生成评估指标,不保存最终模型。", flush=True) + # 发送训练完成消息 final_metrics = { 'mse': metrics['mse'], @@ -552,6 +519,9 @@ def train_product_model_with_mlstm( 'model_path': final_model_path } - emit_progress(f"✅ mLSTM模型训练完成!最终epoch: {epochs} 已保存", progress=100, metrics=final_metrics) - - return model, metrics, epochs, final_model_path \ No newline at end of file + if final_model_path: + emit_progress(f"✅ mLSTM模型训练完成!最终epoch: {epochs} 已保存", progress=100, metrics=final_metrics) + else: + emit_progress(f"❌ mLSTM模型训练失败:性能不达标", progress=100, metrics={'error': '模型性能不佳'}) + + return model, metrics, epochs, final_model_path \ No newline at end of file diff --git a/server/trainers/tcn_trainer.py b/server/trainers/tcn_trainer.py index f05314c..c0e34a4 100644 --- a/server/trainers/tcn_trainer.py +++ b/server/trainers/tcn_trainer.py @@ -31,9 +31,14 @@ def save_checkpoint(checkpoint_data: dict, epoch_or_label, path_info: dict): path_info (dict): 包含所有路径信息的字典 """ if epoch_or_label == 'best': + # 使用由 ModelPathManager 直接提供的最佳检查点路径 checkpoint_path = path_info['best_checkpoint_path'] else: - checkpoint_path = os.path.join(path_info['checkpoint_dir'], f"checkpoint_epoch_{epoch_or_label}.pth") + # 使用 epoch 检查点模板生成路径 + template = path_info.get('epoch_checkpoint_template') + if not template: + raise ValueError("路径信息 'path_info' 中缺少 'epoch_checkpoint_template'。") + checkpoint_path = template.format(N=epoch_or_label) # 保存检查点 torch.save(checkpoint_data, checkpoint_path) @@ -163,7 +168,7 @@ def train_product_model_with_tcn( print(f"训练范围: {training_scope}") print(f"版本: v{version}") print(f"使用设备: {DEVICE}") - print(f"模型将保存到目录: {path_info['version_dir']}") + print(f"模型将保存到: {path_info['base_dir']}") emit_progress(f"训练产品: {product_name} (ID: {product_id})") @@ -464,12 +469,17 @@ def train_product_model_with_tcn( progress_manager.set_stage("model_saving", 50) - # 保存最终模型 - final_model_path = path_info['model_path'] - torch.save(final_model_data, final_model_path) - print(f"[TCN] 最终模型已保存: {final_model_path}", flush=True) - - progress_manager.set_stage("model_saving", 100) + # 检查模型性能是否达标 + # 移除R2检查,始终保存模型 + if metrics: + # 保存最终模型 + final_model_path = path_info['model_path'] + torch.save(final_model_data, final_model_path) + print(f"[TCN] 最终模型已保存: {final_model_path}", flush=True) + progress_manager.set_stage("model_saving", 100) + else: + final_model_path = None + print(f"[TCN] 训练过程中未生成评估指标,不保存最终模型。", flush=True) final_metrics = { 'mse': metrics['mse'], @@ -481,6 +491,9 @@ def train_product_model_with_tcn( 'final_epoch': epochs } - emit_progress(f"模型训练完成!最终epoch: {epochs}", progress=100, metrics=final_metrics) + if final_model_path: + emit_progress(f"模型训练完成!最终epoch: {epochs}", progress=100, metrics=final_metrics) + else: + emit_progress(f"❌ TCN模型训练失败:性能不达标", progress=100, metrics={'error': '模型性能不佳'}) - return model, metrics, epochs, final_model_path \ No newline at end of file + return model, metrics, epochs, final_model_path \ No newline at end of file diff --git a/server/trainers/transformer_trainer.py b/server/trainers/transformer_trainer.py index e503295..981afdb 100644 --- a/server/trainers/transformer_trainer.py +++ b/server/trainers/transformer_trainer.py @@ -21,8 +21,7 @@ from utils.multi_store_data_utils import get_store_product_sales_data, aggregate from utils.visualization import plot_loss_curve from analysis.metrics import evaluate_model from core.config import ( - DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON, - get_next_model_version, get_model_file_path, get_latest_model_version + DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON ) from utils.training_progress import progress_manager from utils.model_manager import model_manager @@ -37,9 +36,14 @@ def save_checkpoint(checkpoint_data: dict, epoch_or_label, path_info: dict): path_info (dict): 包含所有路径信息的字典 """ if epoch_or_label == 'best': + # 使用由 ModelPathManager 直接提供的最佳检查点路径 checkpoint_path = path_info['best_checkpoint_path'] else: - checkpoint_path = os.path.join(path_info['checkpoint_dir'], f"checkpoint_epoch_{epoch_or_label}.pth") + # 使用 epoch 检查点模板生成路径 + template = path_info.get('epoch_checkpoint_template') + if not template: + raise ValueError("路径信息 'path_info' 中缺少 'epoch_checkpoint_template'。") + checkpoint_path = template.format(N=epoch_or_label) # 保存检查点 torch.save(checkpoint_data, checkpoint_path) @@ -148,12 +152,26 @@ def train_product_model_with_transformer( # 默认:加载所有店铺的产品数据 product_df = load_multi_store_data('pharmacy_sales_multi_store.csv', product_id=product_id) training_scope = "所有店铺" + except ValueError as e: + if "No objects to concatenate" in str(e): + err_msg = f"聚合数据失败 (product: {product_id}, store: {store_id}, mode: {training_mode}): 没有找到可聚合的数据。" + emit_progress(err_msg) + # 在这种情况下,我们不能继续,所以抛出异常 + raise ValueError(err_msg) from e + # 对于其他 ValueError,也打印并重新抛出 + emit_progress(f"数据加载时发生值错误: {e}") + raise e except Exception as e: - print(f"多店铺数据加载失败: {e}") + emit_progress(f"多店铺数据加载失败: {e}, 尝试后备方案...") # 后备方案:尝试原始数据 - df = pd.read_excel('pharmacy_sales.xlsx') - product_df = df[df['product_id'] == product_id].sort_values('date') - training_scope = "原始数据" + try: + df = pd.read_excel('pharmacy_sales.xlsx') + product_df = df[df['product_id'] == product_id].sort_values('date') + training_scope = "原始数据" + emit_progress("成功从 'pharmacy_sales.xlsx' 加载后备数据。") + except Exception as fallback_e: + emit_progress(f"后备数据加载失败: {fallback_e}") + raise fallback_e from e else: # 如果传入了product_df,直接使用 if training_mode == 'store' and store_id: @@ -187,7 +205,7 @@ def train_product_model_with_transformer( print(f"[Transformer] 训练产品 '{product_name}' (ID: {product_id}) 的销售预测模型", flush=True) print(f"[Device] 使用设备: {DEVICE}", flush=True) - print(f"[Model] 模型将保存到目录: {path_info['version_dir']}", flush=True) + print(f"[Model] 模型将保存到: {path_info['base_dir']}", flush=True) # 创建特征和目标变量 features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] @@ -391,18 +409,19 @@ def train_product_model_with_transformer( } } - # 保存检查点 - save_checkpoint(checkpoint_data, epoch + 1, path_info) - - # 如果是最佳模型,额外保存一份 - if test_loss < best_loss: + # 检查是否为最佳模型 + is_best = test_loss < best_loss + if is_best: best_loss = test_loss + epochs_no_improve = 0 + # 保存最佳模型检查点 save_checkpoint(checkpoint_data, 'best', path_info) emit_progress(f"💾 保存最佳模型检查点 (epoch {epoch+1}, test_loss: {test_loss:.4f})") - epochs_no_improve = 0 else: epochs_no_improve += 1 - + + # 保存定期的epoch检查点(如果不是最佳模型,或者即时是最佳也保存一份epoch版本) + save_checkpoint(checkpoint_data, epoch + 1, path_info) emit_progress(f"💾 保存训练检查点 epoch_{epoch+1}") if (epoch + 1) % 10 == 0: @@ -491,14 +510,18 @@ def train_product_model_with_transformer( progress_manager.set_stage("model_saving", 50) - # 保存最终模型 - final_model_path = path_info['model_path'] - torch.save(final_model_data, final_model_path) - - progress_manager.set_stage("model_saving", 100) - emit_progress(f"模型已保存到 {final_model_path}") - - print(f"💾 模型已保存到 {final_model_path}", flush=True) + # 检查模型性能是否达标 + # 移除R2检查,始终保存模型 + if metrics: + # 保存最终模型 + final_model_path = path_info['model_path'] + torch.save(final_model_data, final_model_path) + progress_manager.set_stage("model_saving", 100) + emit_progress(f"模型已保存到 {final_model_path}") + print(f"💾 模型已保存到 {final_model_path}", flush=True) + else: + final_model_path = None + print(f"[Transformer] 训练过程中未生成评估指标,不保存最终模型。", flush=True) # 准备最终返回的指标 final_metrics = { @@ -510,5 +533,10 @@ def train_product_model_with_transformer( 'training_time': training_time, 'final_epoch': epochs } + + if final_model_path: + emit_progress(f"✅ Transformer模型训练完成!", progress=100, metrics=final_metrics) + else: + emit_progress(f"❌ Transformer模型训练失败:性能不达标", progress=100, metrics={'error': '模型性能不佳'}) - return model, final_metrics, epochs \ No newline at end of file + return model, metrics, epochs, final_model_path \ No newline at end of file diff --git a/server/utils/data_utils.py b/server/utils/data_utils.py index 2bbc5a8..e248fc5 100644 --- a/server/utils/data_utils.py +++ b/server/utils/data_utils.py @@ -60,7 +60,7 @@ def prepare_data(product_data, sequence_length=30, forecast_horizon=7): scaler_X, scaler_y: 特征和目标的归一化器 """ # 创建特征和目标变量 - features = ['sales', 'price', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] + features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] # 预处理数据 X_raw = product_data[features].values diff --git a/server/utils/file_save.py b/server/utils/file_save.py index 6496e9b..b3f6472 100644 --- a/server/utils/file_save.py +++ b/server/utils/file_save.py @@ -115,7 +115,7 @@ class ModelPathManager: raise ValueError(f"未知的全局训练范围: {training_scope}") aggregation_method = kwargs.get('aggregation_method', 'sum') - return f"global_{scope_part}_agg_{aggregation_method}" + return f"global_{scope_part}_{aggregation_method}" else: raise ValueError(f"未知的训练模式: {training_mode}") @@ -176,7 +176,7 @@ class ModelPathManager: def get_model_paths(self, training_mode: str, model_type: str, **kwargs: Any) -> Dict[str, Any]: """ 主入口函数:为一次新的训练获取所有相关路径和版本信息。 - 此方法会生成唯一的模型标识符,获取新版本号,并构建所有产物的完整路径。 + 此方法遵循扁平化文件存储规范,将逻辑路径编码到文件名中。 Args: training_mode (str): 训练模式 ('product', 'store', 'global')。 @@ -186,79 +186,71 @@ class ModelPathManager: Returns: Dict[str, Any]: 一个包含所有路径和关键信息的字典。 """ - # 1. 生成唯一标识符,并加上模型类型,确保不同模型类型有不同的版本控制 + # 1. 生成不含模型类型和版本的核心标识符,并将其中的分隔符替换为下划线 + # 例如:product/P001/all -> product_P001_all base_identifier = self._generate_identifier(training_mode, **kwargs) - full_identifier = f"{base_identifier}_{model_type}" + + # 规范化处理,将 'scope' 'products' 等关键字替换为更简洁的形式 + # 例如 product_P001_scope_all -> product_P001_all + core_prefix = base_identifier.replace('_scope_', '_').replace('_products_', '_') - # 2. 获取下一个版本号 - next_version = self.get_next_version(full_identifier) + # 2. 构建用于版本控制的完整标识符 (不含版本号) + # 例如: product_P001_all_mlstm + version_control_identifier = f"{core_prefix}_{model_type}" + + # 3. 获取下一个版本号 + next_version = self.get_next_version(version_control_identifier) version_str = f"v{next_version}" - # 3. 根据规则构建基础路径 - if training_mode == 'product': - product_id = kwargs.get('product_id') - store_id = kwargs.get('store_id') - scope = store_id if store_id is not None else 'all' - scope_folder = f"{product_id}_{scope}" - path_parts = [training_mode, scope_folder, model_type, version_str] - - elif training_mode == 'store': - store_id = kwargs.get('store_id') - product_scope = kwargs.get('product_scope', 'all') - if product_scope == 'specific': - product_ids = kwargs.get('product_ids', []) - # 如果只有一个ID,直接使用ID;否则使用哈希 - scope = product_ids[0] if len(product_ids) == 1 else self._hash_ids(product_ids) - else: - scope = 'all' - scope_folder = f"{store_id}_{scope}" - path_parts = [training_mode, scope_folder, model_type, version_str] + # 4. 构建最终的文件名前缀,包含版本号 + # 例如: product_P001_all_mlstm_v2 + filename_prefix = f"{version_control_identifier}_{version_str}" - elif training_mode == 'global': - aggregation_method = kwargs.get('aggregation_method', 'sum') - training_scope = kwargs.get('training_scope', 'all') - - scope_parts = [training_mode] - if training_scope in ['all', 'all_stores_all_products']: - scope_parts.append('all') - elif training_scope == 'selected_stores': - store_ids = kwargs.get('store_ids', []) - scope_id = store_ids[0] if len(store_ids) == 1 else self._hash_ids(store_ids) - scope_parts.extend(['stores', scope_id]) - elif training_scope == 'selected_products': - product_ids = kwargs.get('product_ids', []) - scope_id = product_ids[0] if len(product_ids) == 1 else self._hash_ids(product_ids) - scope_parts.extend(['products', scope_id]) - elif training_scope == 'custom': - store_ids = kwargs.get('store_ids', []) - product_ids = kwargs.get('product_ids', []) - s_id = store_ids[0] if len(store_ids) == 1 else self._hash_ids(store_ids) - p_id = product_ids[0] if len(product_ids) == 1 else self._hash_ids(product_ids) - scope_parts.extend(['custom', s_id, p_id]) + # 5. 确保 `saved_models` 和 `saved_models/checkpoints` 目录存在 + checkpoints_base_dir = os.path.join(self.base_dir, 'checkpoints') + os.makedirs(self.base_dir, exist_ok=True) + os.makedirs(checkpoints_base_dir, exist_ok=True) - scope_parts.extend([aggregation_method, model_type, version_str]) - path_parts = scope_parts - - else: - raise ValueError(f"未知的训练模式: {training_mode}") - - # 4. 创建版本目录 - version_dir = os.path.join(self.base_dir, *path_parts) - os.makedirs(version_dir, exist_ok=True) - - # 创建检查点子目录 - checkpoint_dir = os.path.join(version_dir, 'checkpoints') - os.makedirs(checkpoint_dir, exist_ok=True) - - # 5. 构建并返回包含所有信息的字典 + # 6. 构建并返回包含所有扁平化路径和关键信息的字典 return { - "identifier": full_identifier, + "identifier": version_control_identifier, # 用于版本控制的key + "filename_prefix": filename_prefix, # 用于数据库和文件查找 "version": next_version, "base_dir": self.base_dir, - "version_dir": version_dir, - "model_path": os.path.join(version_dir, "model.pth"), - "metadata_path": os.path.join(version_dir, "metadata.json"), - "loss_curve_path": os.path.join(version_dir, "loss_curve.png"), - "checkpoint_dir": checkpoint_dir, - "best_checkpoint_path": os.path.join(checkpoint_dir, "checkpoint_best.pth") + "model_path": os.path.join(self.base_dir, f"{filename_prefix}_model.pth"), + "metadata_path": os.path.join(self.base_dir, f"{filename_prefix}_metadata.json"), + "loss_curve_path": os.path.join(self.base_dir, f"{filename_prefix}_loss_curve.png"), + "checkpoint_dir": checkpoints_base_dir, # 指向公共的检查点目录 + "best_checkpoint_path": os.path.join(checkpoints_base_dir, f"{filename_prefix}_checkpoint_best.pth"), + # 为动态epoch检查点提供一个格式化模板 + "epoch_checkpoint_template": os.path.join(checkpoints_base_dir, f"{filename_prefix}_checkpoint_epoch_{{N}}.pth") } + + def get_model_path_for_prediction(self, training_mode: str, model_type: str, version: int, **kwargs: Any) -> Optional[str]: + """ + 获取用于预测的已存在模型的完整路径 (遵循扁平化规范)。 + + Args: + training_mode (str): 训练模式。 + model_type (str): 模型类型。 + version (int): 模型版本号。 + **kwargs: 其他用于定位模型的参数。 + + Returns: + Optional[str]: 模型的完整路径,如果不存在则返回None。 + """ + # 1. 生成不含模型类型和版本的核心标识符 + base_identifier = self._generate_identifier(training_mode, **kwargs) + core_prefix = base_identifier.replace('_scope_', '_').replace('_products_', '_') + + # 2. 构建用于版本控制的标识符 + version_control_identifier = f"{core_prefix}_{model_type}" + + # 3. 构建完整的文件名前缀 + version_str = f"v{version}" + filename_prefix = f"{version_control_identifier}_{version_str}" + + # 4. 构建模型文件的完整路径 + model_path = os.path.join(self.base_dir, f"{filename_prefix}_model.pth") + + return model_path if os.path.exists(model_path) else None diff --git a/server/utils/model_manager.py b/server/utils/model_manager.py index 7f50ce2..6c9aba9 100644 --- a/server/utils/model_manager.py +++ b/server/utils/model_manager.py @@ -321,6 +321,24 @@ class ModelManager: 'aggregation_method': aggregation_method } + # 兼容以 _model.pth 结尾的格式 + elif base_name.endswith('_model'): + name_part = base_name.rsplit('_model', 1)[0] + parts = name_part.split('_') + # 假设格式为 {product_id}_{...}_{model_type}_{version} + if len(parts) >= 3: + version = parts[-1] + model_type = parts[-2] + product_id = '_'.join(parts[:-2]) # The rest is product_id + scope + return { + 'model_type': model_type, + 'product_id': product_id, + 'version': version, + 'training_mode': 'product', # Assumption + 'store_id': None, + 'aggregation_method': None + } + # 兼容旧格式 else: # 尝试解析其他格式 @@ -345,7 +363,7 @@ class ModelManager: 'store_id': None, 'aggregation_method': None } - + except Exception as e: print(f"解析文件名失败 {filename}: {e}") diff --git a/server/utils/multi_store_data_utils.py b/server/utils/multi_store_data_utils.py index 8e512f8..b95de06 100644 --- a/server/utils/multi_store_data_utils.py +++ b/server/utils/multi_store_data_utils.py @@ -324,29 +324,21 @@ def aggregate_multi_store_data(product_id: Optional[str] = None, grouping_entity = "所有产品" # 按日期聚合(使用标准化后的列名) - agg_dict = {} - if aggregation_method == 'sum': - agg_dict = { - 'sales': 'sum', # 标准化后的销量列 - 'sales_amount': 'sum', - 'price': 'mean' # 标准化后的价格列,取平均值 - } - elif aggregation_method == 'mean': - agg_dict = { - 'sales': 'mean', - 'sales_amount': 'mean', - 'price': 'mean' - } - elif aggregation_method == 'median': - agg_dict = { - 'sales': 'median', - 'sales_amount': 'median', - 'price': 'median' - } + # 定义一个更健壮的聚合规范,以保留所有特征 + agg_spec = { + 'sales': aggregation_method, + 'sales_amount': aggregation_method, + 'price': 'mean', + 'weekday': 'first', + 'month': 'first', + 'is_holiday': 'first', + 'is_weekend': 'first', + 'is_promotion': 'first', + 'temperature': 'mean' + } - # 确保列名存在 - available_cols = df.columns.tolist() - agg_dict = {k: v for k, v in agg_dict.items() if k in available_cols} + # 只聚合DataFrame中存在的列 + agg_dict = {k: v for k, v in agg_spec.items() if k in df.columns} # 聚合数据 aggregated_df = df.groupby('date').agg(agg_dict).reset_index() diff --git a/server/utils/training_process_manager.py b/server/utils/training_process_manager.py index 0891a79..1cdf4c5 100644 --- a/server/utils/training_process_manager.py +++ b/server/utils/training_process_manager.py @@ -46,6 +46,8 @@ class TrainingTask: training_mode: str store_id: Optional[str] = None aggregation_method: Optional[str] = None # 新增:聚合方式 + product_scope: str = 'all' + product_ids: Optional[list] = None epochs: int = 100 status: str = "pending" # pending, running, completed, failed start_time: Optional[str] = None @@ -149,6 +151,8 @@ class TrainingWorker: store_id=task.store_id, training_mode=task.training_mode, aggregation_method=task.aggregation_method, # 传递聚合方式 + product_scope=task.product_scope, # 传递药品范围 + product_ids=task.product_ids, # 传递药品ID列表 socketio=None, # 子进程中不能直接使用socketio task_id=task.task_id, progress_callback=progress_callback, # 传递进度回调函数 @@ -182,18 +186,29 @@ class TrainingWorker: } training_logger.warning("⚠️ 使用模拟训练结果") - # 训练完成 - task.status = "completed" - task.end_time = time.strftime('%Y-%m-%d %H:%M:%S') - task.progress = 100.0 - task.metrics = metrics - task.message = "训练完成" - - training_logger.success(f"✅ 训练任务完成 - 耗时: {task.end_time}") + # 检查训练是否成功 if metrics: + # 训练成功 + task.status = "completed" + task.end_time = time.strftime('%Y-%m-%d %H:%M:%S') + task.progress = 100.0 + task.metrics = metrics + task.message = "训练完成" + + training_logger.success(f"✅ 训练任务完成 - 耗时: {task.end_time}") training_logger.info(f"📊 训练指标: {metrics}") - - self.result_queue.put(('complete', asdict(task))) + + self.result_queue.put(('complete', asdict(task))) + else: + # 训练失败(性能不佳) + # 即使性能不佳,也标记为完成,让用户决定是否使用 + task.status = "completed" + task.end_time = time.strftime('%Y-%m-%d %H:%M:%S') + task.metrics = metrics if metrics else {} + task.message = "训练完成(性能可能不佳)" + + training_logger.warning(f"⚠️ 训练完成,但性能可能不佳 (metrics: {metrics})") + self.result_queue.put(('complete', asdict(task))) except Exception as e: error_msg = str(e) @@ -305,6 +320,8 @@ class TrainingProcessManager: store_id=training_params.get('store_id'), epochs=training_params.get('epochs', 100), aggregation_method=training_params.get('aggregation_method'), # 新增 + product_scope=training_params.get('product_scope', 'all'), + product_ids=training_params.get('product_ids'), path_info=path_info # 存储路径信息 ) @@ -362,16 +379,20 @@ class TrainingProcessManager: # 如果任务成功完成,则更新版本文件和任务对象中的版本号 if action == 'complete': - if task.path_info: - identifier = task.path_info.get('identifier') - version = task.path_info.get('version') - if identifier and version: - try: - self.path_manager.save_version_info(identifier, version) - self.logger.info(f"✅ 版本信息已更新: identifier={identifier}, version={version}") - task.version = version # 关键修复:将版本号保存到任务对象中 - except Exception as e: - self.logger.error(f"❌ 更新版本文件失败: {e}") + # 只有在训练成功(metrics有效)时才保存版本信息 + if task.metrics and task.metrics.get('r2', -1) >= 0: + if task.path_info: + identifier = task.path_info.get('identifier') + version = task.path_info.get('version') + if identifier and version: + try: + self.path_manager.save_version_info(identifier, version) + self.logger.info(f"✅ 版本信息已更新: identifier={identifier}, version={version}") + task.version = version # 关键修复:将版本号保存到任务对象中 + except Exception as e: + self.logger.error(f"❌ 更新版本文件失败: {e}") + else: + self.logger.warning(f"⚠️ 任务 {task_id} 训练性能不佳或失败,不保存版本信息。") # WebSocket通知 - 使用已转换的数据 if self.websocket_callback: @@ -395,7 +416,9 @@ class TrainingProcessManager: 'end_time': serializable_task_data.get('end_time'), 'product_id': serializable_task_data.get('product_id'), 'model_type': serializable_task_data.get('model_type'), - 'version': version # 添加版本号 + 'version': version, # 添加版本号 + 'product_scope': serializable_task_data.get('product_scope'), + 'product_ids': serializable_task_data.get('product_ids') }) # 额外发送一个完成事件,确保前端能收到 self.websocket_callback('training_completed', { @@ -406,7 +429,9 @@ class TrainingProcessManager: 'metrics': serializable_task_data.get('metrics'), 'product_id': serializable_task_data.get('product_id'), 'model_type': serializable_task_data.get('model_type'), - 'version': version # 添加版本号 + 'version': version, # 添加版本号 + 'product_scope': serializable_task_data.get('product_scope'), + 'product_ids': serializable_task_data.get('product_ids') }) elif action == 'error': # 训练失败 @@ -418,7 +443,9 @@ class TrainingProcessManager: 'message': serializable_task_data.get('message', '训练失败'), 'error': serializable_task_data.get('error'), 'product_id': serializable_task_data.get('product_id'), - 'model_type': serializable_task_data.get('model_type') + 'model_type': serializable_task_data.get('model_type'), + 'product_scope': serializable_task_data.get('product_scope'), + 'product_ids': serializable_task_data.get('product_ids') }) else: # 状态更新 @@ -430,7 +457,9 @@ class TrainingProcessManager: 'message': serializable_task_data.get('message', ''), 'metrics': serializable_task_data.get('metrics'), 'product_id': serializable_task_data.get('product_id'), - 'model_type': serializable_task_data.get('model_type') + 'model_type': serializable_task_data.get('model_type'), + 'product_scope': serializable_task_data.get('product_scope'), + 'product_ids': serializable_task_data.get('product_ids') }) except Exception as e: self.logger.error(f"WebSocket通知失败: {e}") diff --git a/xz训练模型保存规则.md b/xz训练模型保存规则.md index 8a034aa..d0407ca 100644 --- a/xz训练模型保存规则.md +++ b/xz训练模型保存规则.md @@ -1,194 +1,95 @@ -跟文件夹:save_models +# 扁平化模型数据处理规范 (最终版) - -## 按药品训练 ## -1.创建 product 文件夹 -2.选择药品 product下创建药品id 文件夹,根据数据范围加上相应的后缀,聚合所有店铺all,指定店铺就店铺id -3.模型类型 对应的文件下创建模型名称的文件夹 -4.在模型名称的文件夹下,版本文件夹version+第几次训练 -5.在版本文件下存储对应的检查点文件,最终模型文件,损失曲线图 - -## 按店铺训练 ## - -1.创建 store 文件夹 -2.选择店铺 store下创建店铺id 文件夹,根据药品范围加上相应的后缀,所有药品all,指定药品就药品id -3.模型类型 对应的文件下创建模型名称的文件夹 -4.在模型名称的文件夹下,版本文件夹version+第几次训练 -5.在版本文件下存储对应的检查点文件,最终模型文件,损失曲线图 - - -## 按全局训练 ## - -1.创建 global 文件夹 -2.选择训练范围时 创建文件夹根据数据范围,所有店铺所有药品为all,选择店铺就店铺id,选择药品就药品id ,自定义范围就根据下面的店铺id创建,再在店铺id文件夹下创建对应的药品id文件夹 -3.聚合方式 根据聚合方式创建对应的文件 -4.模型类型 对应的文件下创建模型名称的文件夹 -5.在模型名称的文件夹下,版本文件夹version+第几次训练 -6.在版本文件下存储对应的检查点文件,最终模型文件,损失曲线图 +**版本**: 4.0 (最终版) +**核心思想**: 逻辑路径被转换为文件名的一部分,实现极致扁平化的文件存储。 --- -## 优化后模型保存规则分析总结 +## 一、 文件保存规则 -与当前系统中将模型信息编码到文件名并将文件存储在相对扁平目录中的做法相比,新规则引入了一套更具结构化和层级化的模型保存策略。这种优化旨在提高模型文件的可管理性、可追溯性和可扩展性。 +### 1.1. 核心原则 -### 核心思想 +所有元数据都被编码到文件名中。一个逻辑上的层级路径(例如 `product/P001_all/mlstm/v2`)应该被转换为一个用下划线连接的文件名前缀(`product_P001_all_mlstm_v2`)。 -优化后的核心思想是**“目录即元数据”**。通过创建层级分明的目录结构,将模型的训练模式、范围、类型和版本等关键信息体现在目录路径中,而不是仅仅依赖于文件名。所有与单次训练相关的产物(最终模型、检查点、损失曲线图等)都将被统一存放在同一个版本文件夹下,便于管理和溯源。 +### 1.2. 文件存储位置 -### 统一根目录 +- **最终产物**: 所有最终模型、元数据文件、损失图等,统一存放在 `saved_models/` 根目录下。 +- **过程文件**: 所有训练过程中的检查点文件,统一存放在 `saved_models/checkpoints/` 目录下。 -所有模型都将保存在 `saved_models` 文件夹下。 +### 1.3. 文件名生成规则 -### 优化后的目录结构规则 +1. **构建逻辑路径**: 根据训练参数(模式、范围、类型、版本)确定逻辑路径。 + - *示例*: `product/P001_all/mlstm/v2` -#### 1. 按药品训练 (Product Training) +2. **生成文件名前缀**: 将逻辑路径中的所有 `/` 替换为 `_`。 + - *示例*: `product_P001_all_mlstm_v2` -* **目录结构**: `saved_models/product/{product_id}_{scope}/{model_type}/v{N}/` -* **路径解析**: - * `product`: 表示这是按“药品”为核心的训练模式。 - * `{product_id}_{scope}`: - * `{product_id}`: 训练的药品ID 。 - * `{scope}`: 数据的店铺范围。 - * `all`: 使用所有店铺的聚合数据。 - * `{store_id}`: 使用指定店铺的数据。 - * `{model_type}`: 模型的类型 (例如 `mlstm`, `transformer`)。 - * `v{N}`: 模型的版本号 (例如 `v1`, `v2`)。 -* **文件夹内容**: - * 最终模型文件 (例如 `model_final.pth`) - * 训练检查点文件 (例如 `checkpoint_epoch_10.pth`, `checkpoint_best.pth`) - * 损失曲线图 (例如 `loss_curve.png`) +3. **拼接文件后缀**: 在前缀后加上描述文件类型的后缀。 + - `_model.pth` + - `_metadata.json` + - `_loss_curve.png` + - `_checkpoint_best.pth` + - `_checkpoint_epoch_{N}.pth` -#### 2. 按店铺训练 (Store Training) +#### **完整示例:** -* **目录结构**: `saved_models/store/{store_id}_{scope}/{model_type}/v{N}/` -* **路径解析**: - * `store`: 表示这是按“店铺”为核心的训练模式。 - * `{store_id}_{scope}`: - * `{store_id}`: 训练的店铺ID 。 - * `{scope}`: 数据的药品范围。 - * `all`: 使用该店铺所有药品的聚合数据。 - * `{product_id}`: 使用该店铺指定药品 - * `v{N}`: 模型的版本号。 -* **文件夹内容**: 与“按药品训练”模式相同。 - -#### 3. 全局训练 (Global Training) - -* **目录结构**: `saved_models/global/{scope_path}/{aggregation_method}/{model_type}/v{N}/` -* **路径解析**: - * `global`: 表示这是“全局”训练模式。 - * `{scope_path}`: 描述训练所用数据的范围,结构比较灵活: - * `all`: 代表所有店铺的所有药品。 - * `stores/{store_id}`: 代表选择了特定的店铺。 - * `products/{product_id}`: 代表选择了特定的药品。 - * `custom/{store_id}/{product_id}`: 代表自定义范围,同时指定了店铺和药品。 - * `{aggregation_method}`: 数据的聚合方式 (例如 `sum`, `mean`)。 - * `{model_type}`: 模型的类型。 - * `v{N}`: 模型的版本号。 -* **文件夹内容**: 与“按药品训练”模式相同。 - -### 总结 - -总的来说,优化后的规则通过一个清晰、自解释的目录结构,系统化地组织了所有训练产物。这不仅使得查找和管理特定模型变得极为方便,也为未来的自动化模型管理和部署流程奠定了坚实的基础。 +- **最终模型**: `saved_models/product_P001_all_mlstm_v2_model.pth` +- **元数据**: `saved_models/product_P001_all_mlstm_v2_metadata.json` +- **最佳检查点**: `saved_models/checkpoints/product_P001_all_mlstm_v2_checkpoint_best.pth` +- **Epoch 50 检查点**: `saved_models/checkpoints/product_P001_all_mlstm_v2_checkpoint_epoch_50.pth` --- -### 优化规则下的详细文件保存、读取及数据库记录规范 +## 二、 文件读取规则 -基于优化后的目录结构规则,我们进一步定义详细的文件保存、读取、数据库记录及版本管理的具体规范。 +1. **确定模型元数据**: 根据需求确定要加载的模型的训练模式、范围、类型和版本。 +2. **构建文件名前缀**: 按照与保存时相同的逻辑,将元数据拼接成文件名前缀(例如 `product_P001_all_mlstm_v2`)。 +3. **定位文件**: + - 要加载最终模型,查找文件: `saved_models/{prefix}_model.pth`。 + - 要加载最佳检查点,查找文件: `saved_models/checkpoints/{prefix}_checkpoint_best.pth`。 -#### 一、 详细文件保存路径规则 +--- -所有训练产物都保存在对应模型的版本文件夹内,并采用统一的命名约定。 +## 三、 数据库存储规则 -* **最终模型文件**: `model.pth` -* **最佳性能检查点**: `checkpoint_best.pth` -* **定期检查点**: `checkpoint_epoch_{epoch_number}.pth` (例如: `checkpoint_epoch_50.pth`) -* **损失曲线图**: `loss_curve.png` -* **训练元数据**: `metadata.json` (包含训练参数、指标等详细信息) +数据库用于索引,应存储足以重构文件名前缀的关键元数据。 -**示例路径:** - -1. **按药品训练 (P001, 所有店铺, mlstm, v2)**: - * **目录**: `saved_models/product/P001_all/mlstm/v2/` - * **最终模型**: `saved_models/product/P001_all/mlstm/v2/model.pth` - * **损失曲线**: `saved_models/product/P001_all/mlstm/v2/loss_curve.png` - -2. **按店铺训练 (S001, 指定药品P002, transformer, v1)**: - * **目录**: `saved_models/store/S001_P002/transformer/v1/` - * **最终模型**: `saved_models/store/S001_P002/transformer/v1/model.pth` - -3. **全局训练 (所有数据, sum聚合, kan, v5)**: - * **目录**: `saved_models/global/all/sum/kan/v5/` - * **最终模型**: `saved_models/global/all/sum/kan/v5/model.pth` - -#### 二、 文件读取规则 - -读取模型或其产物时,首先根据模型的元数据构建其版本目录路径,然后在该目录内定位具体文件。 - -**读取逻辑:** - -1. **确定模型元数据**: - * 训练模式 (`product`, `store`, `global`) - * 范围 (`{product_id}_{scope}`, `{store_id}_{scope}`, `{scope_path}`) - * 聚合方式 (仅全局模式) - * 模型类型 (`mlstm`, `kan`, etc.) - * 版本号 (`v{N}`) - -2. **构建模型根目录路径**: 根据上述元数据拼接路径。 - * *示例*: 要读取“店铺S001下P002药品的transformer模型v1”,构建路径 `saved_models/store/S001_P002/transformer/v1/`。 - -3. **定位具体文件**: 在构建好的目录下直接读取所需文件。 - * **加载最终模型**: 读取 `model.pth`。 - * **加载最佳模型**: 读取 `checkpoint_best.pth`。 - * **查看损失曲线**: 读取 `loss_curve.png`。 - -#### 三、 数据库保存规则 - -数据库的核心职责是**索引模型**,而不是存储冗余信息。因此,数据库中只保存足以定位到模型版本目录的**路径**信息。 - -**`model_versions` 表结构优化:** +#### **`models` 表结构建议:** | 字段名 | 类型 | 描述 | 示例 | | :--- | :--- | :--- | :--- | | `id` | INTEGER | 主键 | 1 | -| `model_identifier` | TEXT | 模型的唯一标识符,由模式和范围构成 | `product_P001_all` | -| `model_type` | TEXT | 模型类型 | `mlstm` | -| `version` | TEXT | 版本号 | `v2` | -| `model_path` | TEXT | **模型版本目录的相对路径** | `saved_models/product/P001_all/mlstm/v2/` | -| `created_at` | TEXT | 创建时间 | `2025-07-15 18:40:00` | +| `filename_prefix` | TEXT | **完整文件名前缀,可作为唯一标识** | `product_P001_all_mlstm_v2` | +| `model_identifier`| TEXT | 用于版本控制的标识符 (不含版本) | `product_P001_all_mlstm` | +| `version` | INTEGER | 版本号 | `2` | +| `status` | TEXT | 模型状态 | `completed`, `training`, `failed` | +| `created_at` | TEXT | 创建时间 | `2025-07-21 02:29:00` | | `metrics_summary`| TEXT | 关键性能指标的JSON字符串 | `{"rmse": 10.5, "r2": 0.89}` | -**保存逻辑:** +#### **保存逻辑:** +- 训练完成后,向表中插入一条记录。`filename_prefix` 字段是查找与该次训练相关的所有文件的关键。 -* 当一次训练成功完成并生成版本 `v{N}` 后,向 `model_versions` 表中插入一条新记录。 -* `model_path` 字段**只记录到版本目录**,如 `saved_models/product/P001_all/mlstm/v2/`。应用程序根据此路径和标准文件名(如 `model.pth`)来加载具体文件。 +--- -#### 四、 版本记录文件规则 +## 四、 版本记录规则 -为了快速、方便地获取和递增版本号,在 `saved_models` 根目录下维护一个版本记录文件。 +版本管理依赖于根目录下的 `versions.json` 文件,以实现原子化、线程安全的版本号递增。 -* **文件名**: `versions.json` -* **位置**: `saved_models/versions.json` -* **结构**: 一个JSON对象,`key` 是模型的唯一标识符,`value` 是该模型的**最新版本号 (整数)**。 +- **文件名**: `versions.json` +- **位置**: `saved_models/versions.json` +- **结构**: 一个JSON对象,`key` 是不包含版本号的标识符,`value` 是该标识符下最新的版本号(整数)。 + - **Key**: `{prefix_core}_{model_type}` (例如: `product_P001_all_mlstm`) + - **Value**: `Integer` -**`versions.json` 示例:** +#### **`versions.json` 示例:** ```json { "product_P001_all_mlstm": 2, - "store_S001_P002_transformer": 1, - "global_all_sum_kan": 5 + "store_S001_P002_transformer": 1 } ``` -**版本管理流程:** +#### **版本管理流程:** -1. **获取下一个版本号**: - * 在开始新训练前,根据训练参数构建模型的唯一标识符 (例如 `product_P001_all_mlstm`)。 - * 读取 `saved_models/versions.json` 文件。 - * 查找对应的 `key`,获取当前最新版本号。如果 `key` 不存在,则当前版本为 0。 - * 下一个版本号即为 `当前版本号 + 1`。 - -2. **更新版本号**: - * 训练成功后,将新的版本号写回到 `saved_models/versions.json` 文件中,更新对应 `key` 的 `value`。 - * 这个过程需要加锁以防止并发训练时出现版本号冲突。 \ No newline at end of file +1. **获取新版本**: 开始训练前,构建 `key`。读取 `versions.json`,找到对应 `key` 的 `value`。新版本号为 `value + 1` (若key不存在,则为 `1`)。 +2. **更新版本**: 训练成功后,将新的版本号写回到 `versions.json`。此过程**必须使用文件锁**以防止并发冲突。 \ No newline at end of file