From 120caba3cd8a52253917a99201c67880648e9f06 Mon Sep 17 00:00:00 2001 From: LYFxiaoan Date: Fri, 18 Jul 2025 13:14:34 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A2=84=E6=B5=8B=E6=9D=BF=E5=9D=97=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=A8=A1=E5=9E=8B=E7=BB=9F=E4=B8=80=E8=A7=84=E8=8C=83?= =?UTF-8?q?=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- prediction_history.db | Bin 450560 -> 573440 bytes server/api.py | 13 +- server/core/config.py | 120 +++----------- server/trainers/kan_trainer.py | 13 +- server/trainers/mlstm_trainer.py | 144 ++++------------- server/trainers/tcn_trainer.py | 206 +++++-------------------- server/trainers/transformer_trainer.py | 180 ++++----------------- server/utils/model_manager.py | 64 +++++--- 8 files changed, 177 insertions(+), 563 deletions(-) diff --git a/prediction_history.db b/prediction_history.db index 2295ab09db1e02fac8465e8550f806efa50654dd..e2372473c70d0c3f36a9ad79301b6b90376bc74f 100644 GIT binary patch delta 27811 zcmeHQd5~RInZNhldtW-~?$_xh=}tOJXThYCzAtxQIzR%*G8zjKK`n!lw`w4S06`!j z>3$GaLjq~c2_uo!QLAJLnW9}7D-JV`ibgC)bY>hDr>5L+3?ojJQy}yE&V9?h_q}vp z^T&u)T?Mz_J>Nb1`JL}O-}#-B(PRCiqf74TSMC=CVeh>IYsCAHY)TOBpr#7TMkh&mscIxz$lhRTvQcF_{QhlkeR9h;U659W_{f+kjYX4>X z&)a|2{$l$JM~}?xla9XG{m;`!kkVbHZanWSb>VqOsT0rJOEdAjt<-_%t)x0KrP zyt&kdXQ|YR=SXP=p1Vua@!VBv!SkllG(2~fl6dYYDR|yklJUHuB;k2|DS_vwN+O=u zmGBU@BlCH}wY%~2nq7EYeG?wrcH(i>4m`Hrc=U<|pY4=?A$0uWXy@90@0imoN_t}3 zHHG}<-1f~^ZN2Q1o3Gt|)ipN?_dUH<9_YF+krkB7l`|DpIw8FzeNXz@SZdR}lzgl7 zH_`#AM-mdRC4M@#Vv}}W;vW)EDHkeTsUNrPYRfBoy zEp;I`*M*e&<@w8avr*|kcGb4+o3GrwqmaLhk{q3hx{16ZL9s_F0fwgg8tS@xE}{ot zAbKSBuFAKX8$`ZoY}b~)o}FFjCmy5P{yT0tl)wE@Uc~#d2qQ~ReMnzph1nDk{f^O)STuH^>#jcIQu^78X-|0DjYqc!o2)YAUz)K}$4#B)2I zY}=4LU7XkYleV30f^uH#rHPkQXSNz5Yi$vJpXzD(L(3y=gB=^&9_#pN`@$J7u}w%Z zm3&#;n(UsjfBO02v5s5E|N5BxT0$IT-~WWXlpXsQSsBza1Fge%@;n+%jaiv8Q(M^%G+o_!&ftgn^E&l z(e?6{t{Iu4a3}u~Ir|>U;F|?BwP*Wkwvf@)oRQP?e9qG{`HXPe^6`&7FRx9YET2<@ zvS@zZbBh|f$kI?1%|tx2XsMd%*jmvyouXC{_EQdqn{~4~Ix}nMbLfqnt9seIrrK7q zU}F04eZ45`<7LSdi&`dUqGdFEX=%FZS?GzZoh_I}Q}=8=FYH~;)*P2tqFImpn;i7F z=H-j%2o&11eAUWPyJ*>hn$M%NO)G0VzAN0rzIt5Vjqd*A&&u6xdr{@ z$P3zk)BgAE_qT6rKfis!G`;N)ZQpBqR6fzRqiucL^47OoUu=C`nLF)It$P&Ky0O)0 zoj&808RIP%&DbY@Yer$l>CzQdEoqlNgRnyN)=BLkZd86g| zWS8=*@!c=Tw@G{u`9&{h=((I)uuOm>Z`r6z7R{D*953SnI=(I3N3|~MMFWj)s=x)h zBjf05#-bscv9pfl_?!}3hmvU3f)%5&G0e z>>P$n)(1c_1PEh}t7hz6p_tR%qHAlyU6g3S(=%o+pH&Na8;!1;mYOMKbyc&xtc9XE zz>shUeVNVYb;mJ~rfa#XRq!#w9Dvs}Fyd?@>lJ-pxP4F>xg-JP>%}5!Qsi~=G4MUY zk?Cr#?q;*D?zqApO4P$(1XPNu>sT0(Hk!!y{j8eHWId~>dBwb`3AYedUAK_MteaJR zGz%bPWK=)zn3x;%obBc8B1ZcNeK?Xsb?gk<$>*b7%NA8XXCMo=h#?9f=8FDq`qIZ7 zf|#aiI5yDbW>8G6m;;C{)6Qra-7=kB^kFgQ`I_$J)qFOCy!9eVse2kebc#jbzJTO+ z@(+E@Sd`H-SyVNTFC8nZdN~twiBT{;J8w9JjJxCZ)i28UUm0KhV)7rS&maRy6dx4C z2Vq`)MjU?N<2$;AOXh!|*IXo?bysKSVSC>POZ`P8eX5k%S(u>N=v(rlx!yEGGi+6J zRo%N-*M~J@*woYLRfC=N{zNxBu~_a`)0j|v*LQr!a7|A$-JyZS*Jid9u3bDlV8n&& zhaZ*G>AK*F<7C>DdF+Mfq^`+EL}s%OE|up@zKpZkwspxQ+q6{fUK!NE!L;rBwyrlQ zYgIi1MPE;p1~Gy#My|}kHy6HuIb$Fda+llo`!t4?U;MoqTR5j>>bX z{i*2Hf;3Uef%hi+hbA3O^jPP*#GF-=%1|?yt}C<0Ke0zSSdX!9DW4yyZ>&Zcl*1yu zH^Fz2)|l|S>DA61Kg$!pz(F=Th$vnRWtE^1$`EvLNVyyh} ztnMnyI$G&n_9?xl0prThgg#80hUvI^fYF&x$X!E5+OtjFu{{S15eqAm5aQx8c2*D6 z#kS|9zLjQLH#OZf9mDf1+tATqRiMUX?5sUVq7SX-!`jka+x8F-i;hsNIvFb+ zl~t6_*A3)g7@7|Y%8HgRDx>nH)5zX}ipS)rd9V^aFFYI62_q_yW`Ez*b=Wz&Zn@z~ zp&U^m(|@nb<|=R%+R1Z#-*ar!LR`lVPh0iEiSWsFU|o;IT9jJ5uDN(nys7G{i?*n% z2~Vw7gJe*J8aSZ={Wvx$VS;N2n>1S3?>-`}048;xt)G`%$Syh{E9}BclPwY=u*|bc z@4-tDvTLu>B2yBqbh>z-Xpm~$!0!3i#1|!$t_t4Z6KGPdLKh^EepH99X@p)g?Ej${<_$V-VuW;1XLq4yR+yaj6XMi{L-b`g44FC#U2=Z&3* z2T6@KV_`FOskrd{5kE`C&vaQNbV5X1!@8dKfh9~sT%7IxuoQ!#8ZW3_=)- ziUMGAINKP(fQxNm$>|%IBSMj4GN|3eEOsm}&t(aI^D7CXkJ*ohGZNs zux|TQl)}e^<7=2Y092Y+t8+)4j2$^AsSY`5$3{gptfg!lu0ix;HA-B@UfqxkjM^a= z=9F)ESYcygGi=?D7BC`Yi|$FzE^AyA(SdE{`&t7)*U2Wp^MPsNqa)E)rw)lVDK)p7 zjy4y*s%wj(fp}ff4vp#c`S|>Xd=<3>=kW$yyu?C~Yj5i0GDq7E=5hB$Hsu(V#!YdF)oy`?eWEqJ9(L@_Y z9dfssI3%r9bdtujXu?giMV8dVwzVZY0WzB`-vd~1`h0a6s8E@5;;0l|cJF7U-mz`R zmqV2z;i&dJvyt|Mjb;Uk(?;1~{km9#LXGO^4ImT^q@^*gNlicXQeuhGgx@Iy$f6(? z0mw@M$lj5-<(n(@H{WdlT(&&*!dGPZo`1Bvlpi<-xz5B43=+W z>}r@nfmT?!RRb8kht9+kK^k``L7Q5EHf;uNA@G(2u>|m5FRp%IXGw@#zRODe&FqeG z%lG_vu3ovo2_0eO)DiBw`7My-$J!Q?Hb(K zOG_KPXi&ap+9Vyjc(55F*PN+(n}lIJB1nOT5z4Xw6d-mmN6L{-H=>`2yp&j@6~o-BE}p?FgBGdO7>#ckCe^2|8kBmt^b(36wq*%vqys}+ zA}CJWmPN8Aw`F0VT5imgh0%Z6URogjNKx46Yw!o5sN9o#U&7Ku4k6^lu0AX;Vryw( ziELQsXQf4K6mK%mA!^i~-q5JUsuJxFJ&oELyzn~dNbFez`VvKlcgPFWVlzkysl_V= z@k*%0oVfaMM=23U-W8?(X2>5CAE%ouA{UsR0q;SBT*6YeNLJKIS7{C{26IS6aTK5V5wYfQP(6(o(V3oAh(>y(0MU3h+t#v>stN)O;6 zr8kv>#OwKAnaANfjg?>E1nd?UB#=~e0oZt>G2qgHKSwsF8Tel;>~In+iDeh;a{!g) zHDEJ*`Vb&57Q~AI$oBz|%S*`skhIfy7YF2J@V+!bhOYv$XW2=4JOMUQa=2Zz&~C;Q zHm9&j$T0kOGd3GKX8=GhJPc{D+i)w!)vQyw>qLEb=!cd1Lqi&SHpEM$AlDKOjx7T zLFI9o1;&czyPWM-AG0Q9uWv8&aDe?-s;A7dm;7$M?<`AjVCSjtps2(QsO0G2EQSlp z8U7=$VTD&+u1HN<-tFEi%YhE%+2Y`cX=yh2icwo4COowct#7r4ebA_RC;G6~N5mYh zs}t22f$cO(s#O_ZfgdP}l$uwZRxn z7*i;$%{%WV6gJ+_hgL+NV$DcqD{NRy?6hqNELJx8$*&4-57R%VU=?}lb8R8r7}?LC z&W%=1j|+I{*w43`IP1yTIDVXmo-JIv{Ul6uC{Bl%=wOU`W_WdpAh6SPu*!W8JXB2& zYq2_EO>yGr63z@-IM4;6qvq*ugz#M_WYyzYV8wx-3MMKDrLLo42Q#FK;<7REKYQ2< z{~^r{=|yfXeA_zcey|R(c8=y2mGRsNs>FY{T@NHy zXgiC@bfL`RGM-nk`$!vJwr13E>`JETi0YNAdy!R}*lD676>>Y5e9G~Q%rDPZKxaL!EJz@ksLpJ>$OKwRbJ768ANYGv5Bzbt3y5eq zKrl8#z}YTuG&X^+5MB=n;vv|w`@p}+mu3)NR|j;~t2+;$wto=J^CnKf68~m^J%2=* zSMT+4hIQKF2R*cWii@8Rfp*n>9P+n)7hCJVN@ys>W!&e(dEuvdSfavC^EA(-uBr?^ zA_~_;tW#1}fdmVVHl_<4_`-R0jUP6seIJz(?gQ%;JZ1pi0iP9A0z^hDhjR&w0!n!0 zh-B9w_Qy2%e1JmT0An{Y(?n$Ko5#wWR`71Y5&$>evP_&T_xvyrV=}h><>agys}&QP z<6;BJb7AsD*zVX^ipnNHBKP~m+Z%lmZBT0K2F+RDRFP^1O%SUE)i72-?97Kz2>LfQ zdYA@4rWgf<%vPjf1(a5cOiSVcEC4AvS=sgXT+U*ZTxc|&qp+T5yTINz(U_1CU zIJ`?uSpz~$Mj3XqpHn=dhu~xvmQ=cJW6}?=sp@1j#=6lok zb1^nC>b-bh;j;!^1+9 zqtp$CIH{+~s7ba}I+}{c;Q73Wy>PmCj?8z)G1|-LqcB02aj`Es9~uiuRF7SN4}-ia z$hact7s*|~2@D;PTZVs_?#OAoMRZ%G{^D*XItblnOLJG#98Fb7kh0kRER zS%tO2UxP1E2?W|0s)Qsc44OSv?2+@$MIIkF3h+dOc$N6L!hu3-57n^Gq!1HCH?Apz z!BcrHmkAR*&(TD0V_a8Cii~mAm=>c;o4LGXJpCu>G4bFliKN^iq&jd#l9F2e(0QfS zni?guclE(@O8w2GS5>347vp9U1Ggzej$3jg7Zg<(w5I2<$5V28MSS#DkzF^kgF!L3 z180rv2Y4u2To}Yl{ZUG?9UXGlszz9-Mz0qJ(T{h?ePHRuX1CBpWi@f{O4aS%U?0jKHo(!sNf(UkseP#GF4dH;``z#BZn#9ix&k@_D|w$R6Ep4M z4cw@}Od(?mOL zIi1>kngZmzACL__Y}g7W^M$NIvUkf%bq+y}R9Guq;0KQHbDP0Wb%{{1=u9MxbITU4 z3~_A_EN7h6=wT}cl5w^`hiA5+kvKP$A1f)hf}{6(k?7%A~#um#Z8x2$HFWp>-#( z2ra3?3r~u{!$K=h9!9pe&n`MYF_ZNTCTGeNNKS+vwg#^X|7QH*`;+^{KvcOoB=pA5 zPg1n%k>>H=u1xMtEN$ijICP+&_+{mrCTot+mfFjP&Co1Ol! z>BH=Vr;r_a0;BBNGn2TjqrPjZc3?~vjIy3JwNV!C<9Mv9EEv*T5N-0Ap&we<@iob= z#pSY8Q^lxS3~O^wU~OJ6h}T28U5Ue^N~t4gK2EaW7Z@$Xy(y=@IgmYkL>{Eu!oabB z2dHveSgaLscNiA7aJZT{!BD-=Xef@u2Y+n2*cGW-qj$+d-e%NE;Rm6oK*q^eZ$MNE_+A;1*H1Hun zTfyZLVH}pDRb>;vkh`cGG*&r@NW)L+vuK}{lM7d}9E3t9gBp}Uh;Xu~B8EaaCRV)AavH)Q-8XSt^AO(zTwIb7^M3^;MN50FrsL# z!?qWnkZAdb4Jw2XN56ayrWw?PP`F1B_l$SWly{25H7IcN^Cs@D??oI!|xty20$S|t`WpFP>VKPF=v#zI3R=THY@a}W-f^)K*q1zG>3JA zE;ONg>a{7(6S}I3C?dUqRX4?X!cB=Zws~vR*p%-!=HW>^G8S-&>y@Atap4gEO+Hat zns9J(js;d{3Xa#)fr?ZL|v*{$H+FhvNS zg5_TZEE^x>1_O`uA+|1nLVm*#Y*xCC6>JKav3T->ZP299m$;nz15MoV&v8j)3LR{D zesri}0pQ>#)^<2+1`<8V#>CI}FilhXPYVQKwBgj29wFD4*ACG;`{ktc`#2Av7q5Q^Y-OftVvGrHUvls?J3)57kd(zXv=m_qKpLBj0XLCC$u`q{Wiu?5c E0VS++#{d8T delta 639 zcmWNPOGs2v7{|}K_dDm_xsP-6fkcK)`5Z%Ytlg;DqFgoEKoBZ4(N!~TeB@>tngkUg zN|PJhktjr`MW9nQ;AqW43<)2VAU>eba%E1?qAY?KyNloA`z?N(|JT)(-8GZ>Fp~rk zLcT%!X&jvJhBfgbYqkXP8Oni?XiABjkNz`SrD${3;}qCI+D!{b8 zu5~uG`mS7Wy4Bg*;rG9)1bdzzLWtZT6{JWA3xh(CCTW6h&^bCpZ_*lCMNiSA-1Cz3V~?2rbCH+ykMOOI6#+F60a2yi;6~8aS7f4!?wbPKQP1H~yx{+$1luX&Aw0 zwHJzqmf^$lx)M+jIVaa~TdV?`=%=`+IFzuqp&p_A{3>4~m4jdC5o{!6n-vSNkETfj zwkkZsw&jRsblW>)e`a8S!yfZ}BjgY_pF|GhWwtw(d<$_)8*lP*K`2EIGrkE9L(R{P zmA->x7za$YWr<+Hu$eUvm#lzyg3BQeTfz!Fak$OIJS^j|5ifLu96=s~9UqQG7U2iB zl{K((maZn$S#?NlQ>)Z0!Kv&iOUkHnTe+wd z$ouk|JT4^V9=TR7m2J`&DJE7+{ZhSDCUi?_wp}y03>hGfh<@vjrb(Y`4J=5^wMOS` F{{hB=$vXf5 diff --git a/server/api.py b/server/api.py index 23616de..1e1a82b 100644 --- a/server/api.py +++ b/server/api.py @@ -56,7 +56,7 @@ from analysis.metrics import evaluate_model, compare_models # 导入配置和版本管理 from core.config import ( DEFAULT_MODEL_DIR, WEBSOCKET_NAMESPACE, - get_model_versions, get_latest_model_version, get_next_model_version, + get_model_versions, get_model_file_path, save_model_version_info ) @@ -1560,7 +1560,7 @@ def predict(): prediction_result = run_prediction(model_type, product_id, model_id, future_days, start_date, version, store_id, training_mode) if prediction_result is None: - return jsonify({"status": "error", "error": "预测失败,预测器返回None"}), 500 + return jsonify({"status": "error", "error": "模型文件未找到或加载失败"}), 404 # 添加版本信息到预测结果 prediction_result['version'] = version @@ -3782,9 +3782,14 @@ def get_model_types(): def get_model_versions_api(product_id, model_type): """获取模型版本列表API""" try: - versions = get_model_versions(product_id, model_type) - latest_version = get_latest_model_version(product_id, model_type) + from utils.model_manager import model_manager + result = model_manager.list_models(product_id=product_id, model_type=model_type) + models = result.get('models', []) + + versions = sorted(list(set(m['version'] for m in models)), key=lambda v: (v != 'best', v)) + latest_version = versions[0] if versions else None + return jsonify({ "status": "success", "data": { diff --git a/server/core/config.py b/server/core/config.py index 76a314a..871cc81 100644 --- a/server/core/config.py +++ b/server/core/config.py @@ -71,48 +71,6 @@ TRAINING_UPDATE_INTERVAL = 1 # 训练进度更新间隔(秒) # 创建模型保存目录 os.makedirs(DEFAULT_MODEL_DIR, exist_ok=True) -def get_next_model_version(product_id: str, model_type: str) -> str: - """ - 获取指定产品和模型类型的下一个版本号 - - Args: - product_id: 产品ID - model_type: 模型类型 - - Returns: - 下一个版本号,格式如 'v2', 'v3' 等 - """ - # 新格式:带版本号的文件 - pattern_new = f"{model_type}_model_product_{product_id}_v*.pth" - existing_files_new = glob.glob(os.path.join(DEFAULT_MODEL_DIR, pattern_new)) - - # 旧格式:不带版本号的文件(兼容性支持) - pattern_old = f"{model_type}_model_product_{product_id}.pth" - old_file_path = os.path.join(DEFAULT_MODEL_DIR, pattern_old) - has_old_format = os.path.exists(old_file_path) - - # 如果没有任何格式的文件,返回默认版本 - if not existing_files_new and not has_old_format: - return DEFAULT_VERSION - - # 提取新格式文件的版本号 - versions = [] - for file_path in existing_files_new: - filename = os.path.basename(file_path) - version_match = re.search(rf"_v(\d+)\.pth$", filename) - if version_match: - versions.append(int(version_match.group(1))) - - # 如果存在旧格式文件,将其视为v1 - if has_old_format: - versions.append(1) - print(f"检测到旧格式模型文件: {old_file_path},将其视为版本v1") - - if versions: - next_version_num = max(versions) + 1 - return f"v{next_version_num}" - else: - return DEFAULT_VERSION def get_model_file_path(product_id: str, model_type: str, version: str) -> str: """ @@ -134,13 +92,8 @@ def get_model_file_path(product_id: str, model_type: str, version: str) -> str: # 修正:直接使用唯一的product_id(它可能包含store_前缀)来构建文件名 # 文件名示例: transformer_17002608_epoch_best.pth 或 transformer_store_01010023_epoch_best.pth # 针对 KAN 和 optimized_kan,使用 model_manager 的命名约定 - if model_type in ['kan', 'optimized_kan']: - # 格式: {model_type}_product_{product_id}_{version}.pth - # 注意:KAN trainer 保存时,product_id 就是 model_identifier - filename = f"{model_type}_product_{product_id}_{version}.pth" - else: - # 其他模型使用 _epoch_ 约定 - filename = f"{model_type}_{product_id}_epoch_{version}.pth" + # 统一所有模型的命名格式 + filename = f"{model_type}_product_{product_id}_{version}.pth" # 修正:直接在根模型目录查找,不再使用checkpoints子目录 return os.path.join(DEFAULT_MODEL_DIR, filename) @@ -155,67 +108,32 @@ def get_model_versions(product_id: str, model_type: str) -> list: Returns: 版本列表,按版本号排序 """ - # 直接使用传入的product_id构建搜索模式 - # 搜索模式,匹配 "transformer_product_17002608_epoch_50.pth" 或 "transformer_product_17002608_epoch_best.pth" - # 修正:直接使用唯一的product_id(它可能包含store_前缀)来构建搜索模式 - # 扩展搜索模式以兼容多种命名约定 - patterns = [ - f"{model_type}_{product_id}_epoch_*.pth", # 原始格式 (e.g., transformer_123_epoch_best.pth) - f"{model_type}_product_{product_id}_*.pth" # KAN/ModelManager格式 (e.g., kan_product_123_v1.pth) - ] + # 统一使用新的命名约定进行搜索 + pattern = os.path.join(DEFAULT_MODEL_DIR, f"{model_type}_product_{product_id}_*.pth") + existing_files = glob.glob(pattern) - existing_files = [] - for pattern in patterns: - search_path = os.path.join(DEFAULT_MODEL_DIR, pattern) - existing_files.extend(glob.glob(search_path)) - - # 旧格式(兼容性支持) - pattern_old = f"{model_type}_model_product_{product_id}.pth" - old_file_path = os.path.join(DEFAULT_MODEL_DIR, pattern_old) - if os.path.exists(old_file_path): - existing_files.append(old_file_path) - - versions = set() # 使用集合避免重复 + versions = set() - # 从找到的文件中提取版本信息 for file_path in existing_files: filename = os.path.basename(file_path) - # 尝试匹配 _epoch_ 格式 - version_match_epoch = re.search(r"_epoch_(.+)\.pth$", filename) - if version_match_epoch: - versions.add(version_match_epoch.group(1)) - continue - - # 尝试匹配 _product_..._v 格式 (KAN) - version_match_kan = re.search(r"_product_.+_v(\d+)\.pth$", filename) - if version_match_kan: - versions.add(f"v{version_match_kan.group(1)}") - continue + # 严格匹配 _v 或 'best' + match = re.search(r'_(v\d+|best)\.pth$', filename) + if match: + versions.add(match.group(1)) - # 尝试匹配旧的 _model_product_ 格式 - if pattern_old in filename: - versions.add("v1_legacy") # 添加一个特殊标识 - print(f"检测到旧格式模型文件: {old_file_path},将其视为版本 v1_legacy") - continue + # 按数字版本降序排序,'best'始终在最前 + def sort_key(v): + if v == 'best': + return -1 # 'best' is always first + if v.startswith('v'): + return int(v[1:]) + return float('inf') # Should not happen - # 转换为列表并排序 - sorted_versions = sorted(list(versions)) + sorted_versions = sorted(list(versions), key=sort_key, reverse=True) + return sorted_versions -def get_latest_model_version(product_id: str, model_type: str) -> str: - """ - 获取指定产品和模型类型的最新版本 - - Args: - product_id: 产品ID - model_type: 模型类型 - - Returns: - 最新版本号,如果没有则返回None - """ - versions = get_model_versions(product_id, model_type) - return versions[-1] if versions else None def save_model_version_info(product_id: str, model_type: str, version: str, file_path: str, metrics: dict = None): """ diff --git a/server/trainers/kan_trainer.py b/server/trainers/kan_trainer.py index 63f44b7..5b93c23 100644 --- a/server/trainers/kan_trainer.py +++ b/server/trainers/kan_trainer.py @@ -257,11 +257,11 @@ def train_product_model_with_kan(product_id, model_identifier, product_df=None, model_data=best_model_data, product_id=model_identifier, model_type=model_type_name, - version='best', store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method, - product_name=product_name + product_name=product_name, + version='best' # 显式覆盖版本为'best' ) if (epoch + 1) % 10 == 0: @@ -335,15 +335,18 @@ def train_product_model_with_kan(product_id, model_identifier, product_df=None, 'loss_curve_path': loss_curve_path } - model_path = model_manager.save_model( + # 保存最终模型,让 model_manager 自动处理版本号 + final_model_path, final_version = model_manager.save_model( model_data=model_data, product_id=model_identifier, model_type=model_type_name, - version=f'final_epoch_{epochs}', store_id=store_id, training_mode=training_mode, aggregation_method=aggregation_method, product_name=product_name + # 注意:此处不传递version参数,由管理器自动生成 ) - return model, metrics \ No newline at end of file + print(f"最终模型已保存,版本: {final_version}, 路径: {final_model_path}") + + return model, metrics \ No newline at end of file diff --git a/server/trainers/mlstm_trainer.py b/server/trainers/mlstm_trainer.py index af80a75..c6d00f7 100644 --- a/server/trainers/mlstm_trainer.py +++ b/server/trainers/mlstm_trainer.py @@ -20,85 +20,10 @@ from utils.multi_store_data_utils import get_store_product_sales_data, aggregate from utils.visualization import plot_loss_curve from analysis.metrics import evaluate_model from core.config import ( - DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON, - get_next_model_version, get_model_file_path, get_latest_model_version + DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON ) from utils.training_progress import progress_manager - -def save_checkpoint(checkpoint_data: dict, epoch_or_label, model_identifier: str, - model_type: str, model_dir: str, store_id=None, - training_mode: str = 'product', aggregation_method=None): - """ - 保存训练检查点 - - Args: - checkpoint_data: 检查点数据 - epoch_or_label: epoch编号或标签(如'best') - product_id: 产品ID - model_type: 模型类型 - model_dir: 模型保存目录 - store_id: 店铺ID - training_mode: 训练模式 - aggregation_method: 聚合方法 - """ - # 创建检查点目录 - # 直接在模型根目录保存,不再创建子目录 - checkpoint_dir = model_dir - os.makedirs(checkpoint_dir, exist_ok=True) - - # 修正:直接使用product_id作为唯一标识符,因为它已经包含了store_前缀或药品ID - filename = f"{model_type}_{model_identifier}_epoch_{epoch_or_label}.pth" - - checkpoint_path = os.path.join(checkpoint_dir, filename) - - # 保存检查点 - torch.save(checkpoint_data, checkpoint_path) - print(f"[mLSTM] 检查点已保存: {checkpoint_path}", flush=True) - - return checkpoint_path - - -def load_checkpoint(product_id: str, model_type: str, epoch_or_label, - model_dir: str, store_id=None, training_mode: str = 'product', - aggregation_method=None): - """ - 加载训练检查点 - - Args: - product_id: 产品ID - model_type: 模型类型 - epoch_or_label: epoch编号或标签 - model_dir: 模型保存目录 - store_id: 店铺ID - training_mode: 训练模式 - aggregation_method: 聚合方法 - - Returns: - checkpoint_data: 检查点数据,如果未找到返回None - """ - checkpoint_dir = os.path.join(model_dir, 'checkpoints') - - # 生成检查点文件名 - if training_mode == 'store' and store_id: - filename = f"{model_type}_store_{store_id}_{product_id}_epoch_{epoch_or_label}.pth" - elif training_mode == 'global' and aggregation_method: - filename = f"{model_type}_global_{product_id}_{aggregation_method}_epoch_{epoch_or_label}.pth" - else: - filename = f"{model_type}_product_{product_id}_epoch_{epoch_or_label}.pth" - - checkpoint_path = os.path.join(checkpoint_dir, filename) - - if os.path.exists(checkpoint_path): - try: - checkpoint_data = torch.load(checkpoint_path, map_location=DEVICE) - print(f"[mLSTM] 检查点已加载: {checkpoint_path}", flush=True) - return checkpoint_data - except Exception as e: - print(f"[mLSTM] 加载检查点失败: {e}", flush=True) - return None - else: - print(f"[mLSTM] 检查点文件不存在: {checkpoint_path}", flush=True) - return None +from utils.model_manager import model_manager def train_product_model_with_mlstm( product_id, @@ -173,15 +98,9 @@ def train_product_model_with_mlstm( emit_progress("开始mLSTM模型训练...") # 确定版本号 - if version is None: - if continue_training: - version = get_latest_model_version(product_id, 'mlstm') - if version is None: - version = get_next_model_version(product_id, 'mlstm') - else: - version = get_next_model_version(product_id, 'mlstm') - - emit_progress(f"开始训练 mLSTM 模型版本 {version}") + emit_progress(f"开始训练 mLSTM 模型") + if version: + emit_progress(f"使用指定版本: {version}") # 初始化训练进度管理器(如果还未初始化) if socketio and task_id: @@ -234,7 +153,7 @@ def train_product_model_with_mlstm( print(f"[mLSTM] 使用mLSTM模型训练产品 '{product_name}' (ID: {product_id}) 的销售预测模型", flush=True) print(f"[mLSTM] 训练范围: {training_scope}", flush=True) - print(f"[mLSTM] 版本: {version}", flush=True) + # print(f"[mLSTM] 版本: {version}", flush=True) # Version is now handled by model_manager print(f"[mLSTM] 使用设备: {DEVICE}", flush=True) print(f"[mLSTM] 模型将保存到目录: {model_dir}", flush=True) print(f"[mLSTM] 数据量: {len(product_df)} 条记录", flush=True) @@ -323,16 +242,8 @@ def train_product_model_with_mlstm( # 如果是继续训练,加载现有模型 if continue_training and version != 'v1': - try: - existing_model_path = get_model_file_path(product_id, 'mlstm', version) - if os.path.exists(existing_model_path): - checkpoint = torch.load(existing_model_path, map_location=DEVICE) - model.load_state_dict(checkpoint['model_state_dict']) - print(f"加载现有模型: {existing_model_path}") - emit_progress(f"加载现有模型版本 {version} 进行继续训练") - except Exception as e: - print(f"无法加载现有模型,将重新开始训练: {e}") - emit_progress("无法加载现有模型,重新开始训练") + # TODO: Implement continue_training logic with the new model_manager + pass # 将模型移动到设备上 model = model.to(DEVICE) @@ -451,21 +362,23 @@ def train_product_model_with_mlstm( } } - # 保存检查点 - save_checkpoint(checkpoint_data, epoch + 1, model_identifier, 'mlstm', - model_dir, store_id, training_mode, aggregation_method) - # 如果是最佳模型,额外保存一份 if test_loss < best_loss: best_loss = test_loss - save_checkpoint(checkpoint_data, 'best', model_identifier, 'mlstm', - model_dir, store_id, training_mode, aggregation_method) + model_manager.save_model( + model_data=checkpoint_data, + product_id=model_identifier, + model_type='mlstm', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name, + version='best' + ) emit_progress(f"💾 保存最佳模型检查点 (epoch {epoch+1}, test_loss: {test_loss:.4f})") epochs_no_improve = 0 else: epochs_no_improve += 1 - - emit_progress(f"💾 保存训练检查点 epoch_{epoch+1}") if (epoch + 1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}", flush=True) @@ -524,7 +437,6 @@ def train_product_model_with_mlstm( # 计算评估指标 metrics = evaluate_model(test_true_inv, test_pred_inv) metrics['training_time'] = training_time - metrics['version'] = version # 打印评估指标 print("\n模型评估指标:") @@ -576,10 +488,15 @@ def train_product_model_with_mlstm( } } - # 保存最终模型(使用epoch标识) - final_model_path = save_checkpoint( - final_model_data, f"final_epoch_{epochs}", model_identifier, 'mlstm', - model_dir, store_id, training_mode, aggregation_method + # 保存最终模型,让 model_manager 自动处理版本号 + final_model_path, final_version = model_manager.save_model( + model_data=final_model_data, + product_id=model_identifier, + model_type='mlstm', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name ) # 发送训练完成消息 @@ -591,9 +508,10 @@ def train_product_model_with_mlstm( 'mape': metrics['mape'], 'training_time': training_time, 'final_epoch': epochs, - 'model_path': final_model_path + 'model_path': final_model_path, + 'version': final_version } - emit_progress(f"✅ mLSTM模型训练完成!最终epoch: {epochs} 已保存", progress=100, metrics=final_metrics) + emit_progress(f"✅ mLSTM模型训练完成!版本 {final_version} 已保存", progress=100, metrics=final_metrics) - return model, metrics, epochs, final_model_path \ No newline at end of file + return model, metrics, epochs, final_model_path \ No newline at end of file diff --git a/server/trainers/tcn_trainer.py b/server/trainers/tcn_trainer.py index e8de480..c99d3af 100644 --- a/server/trainers/tcn_trainer.py +++ b/server/trainers/tcn_trainer.py @@ -20,39 +20,7 @@ from utils.visualization import plot_loss_curve from analysis.metrics import evaluate_model from core.config import DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON from utils.training_progress import progress_manager - -def save_checkpoint(checkpoint_data: dict, epoch_or_label, model_identifier: str, - model_type: str, model_dir: str, store_id=None, - training_mode: str = 'product', aggregation_method=None): - """ - 保存训练检查点 - - Args: - checkpoint_data: 检查点数据 - epoch_or_label: epoch编号或标签(如'best') - product_id: 产品ID - model_type: 模型类型 - model_dir: 模型保存目录 - store_id: 店铺ID - training_mode: 训练模式 - aggregation_method: 聚合方法 - """ - # 创建检查点目录 - # 直接在模型根目录保存,不再创建子目录 - checkpoint_dir = model_dir - os.makedirs(checkpoint_dir, exist_ok=True) - - # 生成检查点文件名 - # 修正:直接使用product_id作为唯一标识符,因为它已经包含了store_前缀或药品ID - filename = f"{model_type}_{model_identifier}_epoch_{epoch_or_label}.pth" - - checkpoint_path = os.path.join(checkpoint_dir, filename) - - # 保存检查点 - torch.save(checkpoint_data, checkpoint_path) - print(f"[TCN] 检查点已保存: {checkpoint_path}", flush=True) - - return checkpoint_path +from utils.model_manager import model_manager def train_product_model_with_tcn( product_id, @@ -72,21 +40,6 @@ def train_product_model_with_tcn( ): """ 使用TCN模型训练产品销售预测模型 - - 参数: - product_id: 产品ID - epochs: 训练轮次 - model_dir: 模型保存目录,默认使用配置中的DEFAULT_MODEL_DIR - version: 指定版本号,如果为None则自动生成 - socketio: WebSocket对象,用于实时反馈 - task_id: 训练任务ID - continue_training: 是否继续训练现有模型 - - 返回: - model: 训练好的模型 - metrics: 模型评估指标 - version: 实际使用的版本号 - model_path: 模型文件路径 """ def emit_progress(message, progress=None, metrics=None): @@ -103,62 +56,21 @@ def train_product_model_with_tcn( data['metrics'] = metrics socketio.emit('training_progress', data, namespace='/training') - # 确定版本号 - if version is None: - from core.config import get_latest_model_version, get_next_model_version - if continue_training: - version = get_latest_model_version(product_id, 'tcn') - if version is None: - version = get_next_model_version(product_id, 'tcn') - else: - version = get_next_model_version(product_id, 'tcn') + emit_progress(f"开始训练 TCN 模型") - emit_progress(f"开始训练 TCN 模型版本 {version}") - - # 如果没有传入product_df,则根据训练模式加载数据 if product_df is None: - from utils.multi_store_data_utils import load_multi_store_data, get_store_product_sales_data, aggregate_multi_store_data - - try: - if training_mode == 'store' and store_id: - # 加载特定店铺的数据 - product_df = get_store_product_sales_data( - store_id, - product_id, - 'pharmacy_sales_multi_store.csv' - ) - training_scope = f"店铺 {store_id}" - elif training_mode == 'global': - # 聚合所有店铺的数据 - product_df = aggregate_multi_store_data( - product_id, - aggregation_method=aggregation_method, - file_path='pharmacy_sales_multi_store.csv' - ) - training_scope = f"全局聚合({aggregation_method})" - else: - # 默认:加载所有店铺的产品数据 - product_df = load_multi_store_data('pharmacy_sales_multi_store.csv', product_id=product_id) - training_scope = "所有店铺" - except Exception as e: - print(f"多店铺数据加载失败: {e}") - # 后备方案:尝试原始数据 - df = pd.read_excel('pharmacy_sales.xlsx') - product_df = df[df['product_id'] == product_id].sort_values('date') - training_scope = "原始数据" + from utils.multi_store_data_utils import aggregate_multi_store_data + product_df = aggregate_multi_store_data( + product_id=product_id, + aggregation_method=aggregation_method + ) + training_scope = f"全局聚合({aggregation_method})" else: - # 如果传入了product_df,直接使用 - if training_mode == 'store' and store_id: - training_scope = f"店铺 {store_id}" - elif training_mode == 'global': - training_scope = f"全局聚合({aggregation_method})" - else: - training_scope = "所有店铺" + training_scope = "所有店铺" if product_df.empty: raise ValueError(f"产品 {product_id} 没有可用的销售数据") - # 数据量检查 min_required_samples = sequence_length + forecast_horizon if len(product_df) < min_required_samples: error_msg = ( @@ -166,10 +78,6 @@ def train_product_model_with_tcn( f"当前配置需要: {min_required_samples} 天数据 (LOOK_BACK={sequence_length} + FORECAST_HORIZON={forecast_horizon})\n" f"实际数据量: {len(product_df)} 天\n" f"产品ID: {product_id}, 训练模式: {training_mode}\n" - f"建议解决方案:\n" - f"1. 生成更多数据: uv run generate_multi_store_data.py\n" - f"2. 调整配置参数: 减小 LOOK_BACK 或 FORECAST_HORIZON\n" - f"3. 使用全局训练模式聚合更多数据" ) print(error_msg) emit_progress(f"训练失败:数据不足 ({len(product_df)}/{min_required_samples} 天)") @@ -180,48 +88,39 @@ def train_product_model_with_tcn( print(f"使用TCN模型训练产品 '{product_name}' (ID: {product_id}) 的销售预测模型") print(f"训练范围: {training_scope}") - print(f"版本: {version}") print(f"使用设备: {DEVICE}") print(f"模型将保存到目录: {model_dir}") emit_progress(f"训练产品: {product_name} (ID: {product_id})") - # 创建特征和目标变量 features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] - # 预处理数据 X = product_df[features].values - y = product_df[['sales']].values # 保持为二维数组 + y = product_df[['sales']].values - # 设置数据预处理阶段 progress_manager.set_stage("data_preprocessing", 0) emit_progress("数据预处理中...") - # 归一化数据 scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) X_scaled = scaler_X.fit_transform(X) y_scaled = scaler_y.fit_transform(y) - # 划分训练集和测试集(80% 训练,20% 测试) train_size = int(len(X_scaled) * 0.8) X_train, X_test = X_scaled[:train_size], X_scaled[train_size:] y_train, y_test = y_scaled[:train_size], y_scaled[train_size:] progress_manager.set_stage("data_preprocessing", 50) - # 创建时间序列数据 trainX, trainY = create_dataset(X_train, y_train, sequence_length, forecast_horizon) testX, testY = create_dataset(X_test, y_test, sequence_length, forecast_horizon) - # 转换为PyTorch的Tensor trainX_tensor = torch.Tensor(trainX) trainY_tensor = torch.Tensor(trainY) testX_tensor = torch.Tensor(testX) testY_tensor = torch.Tensor(testY) - # 创建数据加载器 train_dataset = PharmacyDataset(trainX_tensor, trainY_tensor) test_dataset = PharmacyDataset(testX_tensor, testY_tensor) @@ -229,7 +128,6 @@ def train_product_model_with_tcn( train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) - # 更新进度管理器的批次信息 total_batches = len(train_loader) total_samples = len(train_dataset) progress_manager.total_batches_per_epoch = total_batches @@ -238,7 +136,6 @@ def train_product_model_with_tcn( progress_manager.set_stage("data_preprocessing", 100) - # 初始化TCN模型 input_dim = X_train.shape[1] output_dim = forecast_horizon hidden_size = 64 @@ -254,21 +151,8 @@ def train_product_model_with_tcn( dropout=dropout_rate ) - # 如果是继续训练,加载现有模型 - if continue_training and version != 'v1': - try: - from core.config import get_model_file_path - existing_model_path = get_model_file_path(product_id, 'tcn', version) - if os.path.exists(existing_model_path): - checkpoint = torch.load(existing_model_path, map_location=DEVICE) - model.load_state_dict(checkpoint['model_state_dict']) - print(f"加载现有模型: {existing_model_path}") - emit_progress(f"加载现有模型版本 {version} 进行继续训练") - except Exception as e: - print(f"无法加载现有模型,将重新开始训练: {e}") - emit_progress("无法加载现有模型,重新开始训练") + # TODO: Implement continue_training logic with the new model_manager - # 将模型移动到设备上 model = model.to(DEVICE) criterion = nn.MSELoss() @@ -276,20 +160,17 @@ def train_product_model_with_tcn( emit_progress("开始模型训练...") - # 训练模型 train_losses = [] test_losses = [] start_time = time.time() - # 配置检查点保存 - checkpoint_interval = max(1, epochs // 10) # 每10%进度保存一次,最少每1个epoch + checkpoint_interval = max(1, epochs // 10) best_loss = float('inf') progress_manager.set_stage("model_training", 0) emit_progress(f"开始训练 - 总epoch: {epochs}, 检查点间隔: {checkpoint_interval}") for epoch in range(epochs): - # 开始新的轮次 progress_manager.start_epoch(epoch) model.train() @@ -298,43 +179,34 @@ def train_product_model_with_tcn( for batch_idx, (X_batch, y_batch) in enumerate(train_loader): X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE) - # 确保目标张量有正确的形状 (batch_size, forecast_horizon, 1) if y_batch.dim() == 2: y_batch = y_batch.unsqueeze(-1) - # 前向传播 outputs = model(X_batch) - # 确保输出和目标形状匹配 loss = criterion(outputs, y_batch) - # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() epoch_loss += loss.item() - # 更新批次进度(每10个批次更新一次) if batch_idx % 10 == 0 or batch_idx == len(train_loader) - 1: current_lr = optimizer.param_groups[0]['lr'] progress_manager.update_batch(batch_idx, loss.item(), current_lr) - # 计算训练损失 train_loss = epoch_loss / len(train_loader) train_losses.append(train_loss) - # 设置验证阶段 progress_manager.set_stage("validation", 0) - # 在测试集上评估 model.eval() test_loss = 0 with torch.no_grad(): for batch_idx, (X_batch, y_batch) in enumerate(test_loader): X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE) - # 确保目标张量有正确的形状 if y_batch.dim() == 2: y_batch = y_batch.unsqueeze(-1) @@ -342,7 +214,6 @@ def train_product_model_with_tcn( loss = criterion(outputs, y_batch) test_loss += loss.item() - # 更新验证进度 if batch_idx % 5 == 0 or batch_idx == len(test_loader) - 1: val_progress = (batch_idx / len(test_loader)) * 100 progress_manager.set_stage("validation", val_progress) @@ -350,10 +221,8 @@ def train_product_model_with_tcn( test_loss = test_loss / len(test_loader) test_losses.append(test_loss) - # 完成当前轮次 progress_manager.finish_epoch(train_loss, test_loss) - # 发送训练进度(保持与旧系统的兼容性) if (epoch + 1) % 5 == 0 or epoch == epochs - 1: progress = ((epoch + 1) / epochs) * 100 current_metrics = { @@ -365,7 +234,6 @@ def train_product_model_with_tcn( emit_progress(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}", progress=progress, metrics=current_metrics) - # 定期保存检查点 if (epoch + 1) % checkpoint_interval == 0 or epoch == epochs - 1: checkpoint_data = { 'epoch': epoch + 1, @@ -399,30 +267,28 @@ def train_product_model_with_tcn( } } - # 保存检查点 - save_checkpoint(checkpoint_data, epoch + 1, model_identifier, 'tcn', - model_dir, store_id, training_mode, aggregation_method) - - # 如果是最佳模型,额外保存一份 if test_loss < best_loss: best_loss = test_loss - save_checkpoint(checkpoint_data, 'best', model_identifier, 'tcn', - model_dir, store_id, training_mode, aggregation_method) + model_manager.save_model( + model_data=checkpoint_data, + product_id=model_identifier, + model_type='tcn', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name, + version='best' + ) emit_progress(f"💾 保存最佳模型检查点 (epoch {epoch+1}, test_loss: {test_loss:.4f})") - - emit_progress(f"💾 保存训练检查点 epoch_{epoch+1}") if (epoch + 1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}") - # 计算训练时间 training_time = time.time() - start_time - # 设置模型保存阶段 progress_manager.set_stage("model_saving", 0) emit_progress("训练完成,正在保存模型...") - # 绘制损失曲线并保存到模型目录 loss_curve_path = plot_loss_curve( train_losses, test_losses, @@ -432,23 +298,17 @@ def train_product_model_with_tcn( ) print(f"损失曲线已保存到: {loss_curve_path}") - # 评估模型 model.eval() with torch.no_grad(): - # 确保测试数据的形状正确 test_pred = model(testX_tensor.to(DEVICE)) - # 将输出转换为二维数组 [samples, forecast_horizon] test_pred = test_pred.squeeze(-1).cpu().numpy() - # 反归一化预测结果和真实值 test_pred_inv = scaler_y.inverse_transform(test_pred.reshape(-1, 1)).flatten() test_true_inv = scaler_y.inverse_transform(testY.reshape(-1, 1)).flatten() - # 计算评估指标 metrics = evaluate_model(test_true_inv, test_pred_inv) metrics['training_time'] = training_time - # 打印评估指标 print("\n模型评估指标:") print(f"MSE: {metrics['mse']:.4f}") print(f"RMSE: {metrics['rmse']:.4f}") @@ -457,9 +317,8 @@ def train_product_model_with_tcn( print(f"MAPE: {metrics['mape']:.2f}%") print(f"训练时间: {training_time:.2f}秒") - # 保存最终训练完成的模型(基于最终epoch) final_model_data = { - 'epoch': epochs, # 最终epoch + 'epoch': epochs, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'train_loss': train_losses[-1], @@ -495,10 +354,14 @@ def train_product_model_with_tcn( progress_manager.set_stage("model_saving", 50) - # 保存最终模型(使用epoch标识) - final_model_path = save_checkpoint( - final_model_data, f"final_epoch_{epochs}", model_identifier, 'tcn', - model_dir, store_id, training_mode, aggregation_method + final_model_path, final_version = model_manager.save_model( + model_data=final_model_data, + product_id=model_identifier, + model_type='tcn', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name ) progress_manager.set_stage("model_saving", 100) @@ -510,9 +373,10 @@ def train_product_model_with_tcn( 'r2': metrics['r2'], 'mape': metrics['mape'], 'training_time': training_time, - 'final_epoch': epochs + 'final_epoch': epochs, + 'version': final_version } - emit_progress(f"模型训练完成!最终epoch: {epochs}", progress=100, metrics=final_metrics) + emit_progress(f"模型训练完成!版本 {final_version} 已保存", progress=100, metrics=final_metrics) - return model, metrics, epochs, final_model_path \ No newline at end of file + return model, metrics, epochs, final_model_path \ No newline at end of file diff --git a/server/trainers/transformer_trainer.py b/server/trainers/transformer_trainer.py index a574909..325a281 100644 --- a/server/trainers/transformer_trainer.py +++ b/server/trainers/transformer_trainer.py @@ -21,43 +21,11 @@ from utils.multi_store_data_utils import get_store_product_sales_data, aggregate from utils.visualization import plot_loss_curve from analysis.metrics import evaluate_model from core.config import ( - DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON, - get_next_model_version, get_model_file_path, get_latest_model_version + DEVICE, DEFAULT_MODEL_DIR, LOOK_BACK, FORECAST_HORIZON ) from utils.training_progress import progress_manager from utils.model_manager import model_manager -def save_checkpoint(checkpoint_data: dict, epoch_or_label, model_identifier: str, - model_type: str, model_dir: str, store_id=None, - training_mode: str = 'product', aggregation_method=None): - """ - 保存训练检查点 - - Args: - checkpoint_data: 检查点数据 - epoch_or_label: epoch编号或标签(如'best') - product_id: 产品ID - model_type: 模型类型 - model_dir: 模型保存目录 - store_id: 店铺ID - training_mode: 训练模式 - aggregation_method: 聚合方法 - """ - # 直接在模型根目录保存,不再创建子目录 - checkpoint_dir = model_dir - os.makedirs(checkpoint_dir, exist_ok=True) - - # 修正:直接使用product_id作为唯一标识符,因为它已经包含了store_前缀或药品ID - filename = f"{model_type}_{model_identifier}_epoch_{epoch_or_label}.pth" - - checkpoint_path = os.path.join(checkpoint_dir, filename) - - # 保存检查点 - torch.save(checkpoint_data, checkpoint_path) - print(f"[Transformer] 检查点已保存: {checkpoint_path}", flush=True) - - return checkpoint_path - def train_product_model_with_transformer( product_id, model_identifier, @@ -79,23 +47,8 @@ def train_product_model_with_transformer( ): """ 使用Transformer模型训练产品销售预测模型 - - 参数: - product_id: 产品ID - epochs: 训练轮次 - model_dir: 模型保存目录,默认使用配置中的DEFAULT_MODEL_DIR - version: 指定版本号,如果为None则自动生成 - socketio: WebSocket对象,用于实时反馈 - task_id: 训练任务ID - continue_training: 是否继续训练现有模型 - - 返回: - model: 训练好的模型 - metrics: 模型评估指标 - version: 实际使用的版本号 """ - # WebSocket进度反馈函数 def emit_progress(message, progress=None, metrics=None): """发送训练进度到前端""" if socketio and task_id: @@ -110,18 +63,15 @@ def train_product_model_with_transformer( data['metrics'] = metrics socketio.emit('training_progress', data, namespace='/training') print(f"[{time.strftime('%H:%M:%S')}] {message}", flush=True) - # 强制刷新输出缓冲区 import sys sys.stdout.flush() sys.stderr.flush() emit_progress("开始Transformer模型训练...") - # 获取训练进度管理器实例 try: from utils.training_progress import progress_manager except ImportError: - # 如果无法导入,创建一个空的管理器以避免错误 class DummyProgressManager: def set_stage(self, *args, **kwargs): pass def start_training(self, *args, **kwargs): pass @@ -131,50 +81,19 @@ def train_product_model_with_transformer( def finish_training(self, *args, **kwargs): pass progress_manager = DummyProgressManager() - # 如果没有传入product_df,则根据训练模式加载数据 if product_df is None: - from utils.multi_store_data_utils import load_multi_store_data, get_store_product_sales_data, aggregate_multi_store_data - - try: - if training_mode == 'store' and store_id: - # 加载特定店铺的数据 - product_df = get_store_product_sales_data( - store_id, - product_id, - 'pharmacy_sales_multi_store.csv' - ) - training_scope = f"店铺 {store_id}" - elif training_mode == 'global': - # 聚合所有店铺的数据 - product_df = aggregate_multi_store_data( - product_id, - aggregation_method=aggregation_method, - file_path='pharmacy_sales_multi_store.csv' - ) - training_scope = f"全局聚合({aggregation_method})" - else: - # 默认:加载所有店铺的产品数据 - product_df = load_multi_store_data('pharmacy_sales_multi_store.csv', product_id=product_id) - training_scope = "所有店铺" - except Exception as e: - print(f"多店铺数据加载失败: {e}") - # 后备方案:尝试原始数据 - df = pd.read_excel('pharmacy_sales.xlsx') - product_df = df[df['product_id'] == product_id].sort_values('date') - training_scope = "原始数据" + from utils.multi_store_data_utils import aggregate_multi_store_data + product_df = aggregate_multi_store_data( + product_id=product_id, + aggregation_method=aggregation_method + ) + training_scope = f"全局聚合({aggregation_method})" else: - # 如果传入了product_df,直接使用 - if training_mode == 'store' and store_id: - training_scope = f"店铺 {store_id}" - elif training_mode == 'global': - training_scope = f"全局聚合({aggregation_method})" - else: - training_scope = "所有店铺" + training_scope = "所有店铺" if product_df.empty: raise ValueError(f"产品 {product_id} 没有可用的销售数据") - # 数据量检查 min_required_samples = sequence_length + forecast_horizon if len(product_df) < min_required_samples: error_msg = ( @@ -182,10 +101,6 @@ def train_product_model_with_transformer( f"当前配置需要: {min_required_samples} 天数据 (LOOK_BACK={sequence_length} + FORECAST_HORIZON={forecast_horizon})\n" f"实际数据量: {len(product_df)} 天\n" f"产品ID: {product_id}, 训练模式: {training_mode}\n" - f"建议解决方案:\n" - f"1. 生成更多数据: uv run generate_multi_store_data.py\n" - f"2. 调整配置参数: 减小 LOOK_BACK 或 FORECAST_HORIZON\n" - f"3. 使用全局训练模式聚合更多数据" ) print(error_msg) raise ValueError(error_msg) @@ -197,18 +112,14 @@ def train_product_model_with_transformer( print(f"[Device] 使用设备: {DEVICE}", flush=True) print(f"[Model] 模型将保存到目录: {model_dir}", flush=True) - # 创建特征和目标变量 features = ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] - # 设置数据预处理阶段 progress_manager.set_stage("data_preprocessing", 0) emit_progress("数据预处理中...") - # 预处理数据 X = product_df[features].values - y = product_df[['sales']].values # 保持为二维数组 + y = product_df[['sales']].values - # 归一化数据 scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) @@ -217,24 +128,20 @@ def train_product_model_with_transformer( progress_manager.set_stage("data_preprocessing", 40) - # 划分训练集和测试集(80% 训练,20% 测试) train_size = int(len(X_scaled) * 0.8) X_train, X_test = X_scaled[:train_size], X_scaled[train_size:] y_train, y_test = y_scaled[:train_size], y_scaled[train_size:] - # 创建时间序列数据 trainX, trainY = create_dataset(X_train, y_train, sequence_length, forecast_horizon) testX, testY = create_dataset(X_test, y_test, sequence_length, forecast_horizon) progress_manager.set_stage("data_preprocessing", 70) - # 转换为PyTorch的Tensor trainX_tensor = torch.Tensor(trainX) trainY_tensor = torch.Tensor(trainY) testX_tensor = torch.Tensor(testX) testY_tensor = torch.Tensor(testY) - # 创建数据加载器 train_dataset = PharmacyDataset(trainX_tensor, trainY_tensor) test_dataset = PharmacyDataset(testX_tensor, testY_tensor) @@ -242,7 +149,6 @@ def train_product_model_with_transformer( train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) - # 更新进度管理器的批次信息 total_batches = len(train_loader) total_samples = len(train_dataset) progress_manager.total_batches_per_epoch = total_batches @@ -252,7 +158,6 @@ def train_product_model_with_transformer( progress_manager.set_stage("data_preprocessing", 100) emit_progress("数据预处理完成,开始模型训练...") - # 初始化Transformer模型 input_dim = X_train.shape[1] output_dim = forecast_horizon hidden_size = 64 @@ -272,20 +177,17 @@ def train_product_model_with_transformer( batch_size=batch_size ) - # 将模型移动到设备上 model = model.to(DEVICE) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=patience // 2, factor=0.5) - # 训练模型 train_losses = [] test_losses = [] start_time = time.time() - # 配置检查点保存 - checkpoint_interval = max(1, epochs // 10) # 每10%进度保存一次,最少每1个epoch + checkpoint_interval = max(1, epochs // 10) best_loss = float('inf') epochs_no_improve = 0 @@ -293,7 +195,6 @@ def train_product_model_with_transformer( emit_progress(f"开始训练 - 总epoch: {epochs}, 检查点间隔: {checkpoint_interval}, 耐心值: {patience}") for epoch in range(epochs): - # 开始新的轮次 progress_manager.start_epoch(epoch) model.train() @@ -302,12 +203,9 @@ def train_product_model_with_transformer( for batch_idx, (X_batch, y_batch) in enumerate(train_loader): X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE) - # 确保目标张量有正确的形状 - # 前向传播 outputs = model(X_batch) loss = criterion(outputs, y_batch) - # 反向传播和优化 optimizer.zero_grad() loss.backward() if clip_norm: @@ -316,31 +214,25 @@ def train_product_model_with_transformer( epoch_loss += loss.item() - # 更新批次进度 if batch_idx % 5 == 0 or batch_idx == len(train_loader) - 1: current_lr = optimizer.param_groups[0]['lr'] progress_manager.update_batch(batch_idx, loss.item(), current_lr) - # 计算训练损失 train_loss = epoch_loss / len(train_loader) train_losses.append(train_loss) - # 设置验证阶段 progress_manager.set_stage("validation", 0) - # 在测试集上评估 model.eval() test_loss = 0 with torch.no_grad(): for batch_idx, (X_batch, y_batch) in enumerate(test_loader): X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE) - # 确保目标张量有正确的形状 outputs = model(X_batch) loss = criterion(outputs, y_batch) test_loss += loss.item() - # 更新验证进度 if batch_idx % 3 == 0 or batch_idx == len(test_loader) - 1: val_progress = (batch_idx / len(test_loader)) * 100 progress_manager.set_stage("validation", val_progress) @@ -348,13 +240,10 @@ def train_product_model_with_transformer( test_loss = test_loss / len(test_loader) test_losses.append(test_loss) - # 更新学习率 scheduler.step(test_loss) - # 完成当前轮次 progress_manager.finish_epoch(train_loss, test_loss) - # 发送训练进度 if (epoch + 1) % 5 == 0 or epoch == epochs - 1: progress = ((epoch + 1) / epochs) * 100 current_metrics = { @@ -366,7 +255,6 @@ def train_product_model_with_transformer( emit_progress(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}", progress=progress, metrics=current_metrics) - # 定期保存检查点 if (epoch + 1) % checkpoint_interval == 0 or epoch == epochs - 1: checkpoint_data = { 'epoch': epoch + 1, @@ -399,38 +287,35 @@ def train_product_model_with_transformer( } } - # 保存检查点 - save_checkpoint(checkpoint_data, epoch + 1, model_identifier, 'transformer', - model_dir, store_id, training_mode, aggregation_method) - - # 如果是最佳模型,额外保存一份 if test_loss < best_loss: best_loss = test_loss - save_checkpoint(checkpoint_data, 'best', model_identifier, 'transformer', - model_dir, store_id, training_mode, aggregation_method) + model_manager.save_model( + model_data=checkpoint_data, + product_id=model_identifier, + model_type='transformer', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name, + version='best' + ) emit_progress(f"💾 保存最佳模型检查点 (epoch {epoch+1}, test_loss: {test_loss:.4f})") epochs_no_improve = 0 else: epochs_no_improve += 1 - - emit_progress(f"💾 保存训练检查点 epoch_{epoch+1}") if (epoch + 1) % 10 == 0: print(f"📊 Epoch {epoch+1}/{epochs}, 训练损失: {train_loss:.4f}, 测试损失: {test_loss:.4f}", flush=True) - # 提前停止逻辑 if epochs_no_improve >= patience: emit_progress(f"连续 {patience} 个epoch测试损失未改善,提前停止训练。") break - # 计算训练时间 training_time = time.time() - start_time - # 设置模型保存阶段 progress_manager.set_stage("model_saving", 0) emit_progress("训练完成,正在保存模型...") - # 绘制损失曲线并保存到模型目录 loss_curve_path = plot_loss_curve( train_losses, test_losses, @@ -440,21 +325,17 @@ def train_product_model_with_transformer( ) print(f"📈 损失曲线已保存到: {loss_curve_path}", flush=True) - # 评估模型 model.eval() with torch.no_grad(): test_pred = model(testX_tensor.to(DEVICE)).cpu().numpy() test_true = testY - # 反归一化预测结果和真实值 test_pred_inv = scaler_y.inverse_transform(test_pred) test_true_inv = scaler_y.inverse_transform(test_true) - # 计算评估指标 metrics = evaluate_model(test_true_inv, test_pred_inv) metrics['training_time'] = training_time - # 打印评估指标 print(f"\n📊 模型评估指标:", flush=True) print(f" MSE: {metrics['mse']:.4f}", flush=True) print(f" RMSE: {metrics['rmse']:.4f}", flush=True) @@ -463,9 +344,8 @@ def train_product_model_with_transformer( print(f" MAPE: {metrics['mape']:.2f}%", flush=True) print(f" ⏱️ 训练时间: {training_time:.2f}秒", flush=True) - # 保存最终训练完成的模型(基于最终epoch) final_model_data = { - 'epoch': epochs, # 最终epoch + 'epoch': epochs, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'train_loss': train_losses[-1], @@ -500,10 +380,14 @@ def train_product_model_with_transformer( progress_manager.set_stage("model_saving", 50) - # 保存最终模型(使用epoch标识) - final_model_path = save_checkpoint( - final_model_data, f"final_epoch_{epochs}", model_identifier, 'transformer', - model_dir, store_id, training_mode, aggregation_method + final_model_path, final_version = model_manager.save_model( + model_data=final_model_data, + product_id=model_identifier, + model_type='transformer', + store_id=store_id, + training_mode=training_mode, + aggregation_method=aggregation_method, + product_name=product_name ) progress_manager.set_stage("model_saving", 100) @@ -511,7 +395,6 @@ def train_product_model_with_transformer( print(f"💾 模型已保存到 {final_model_path}", flush=True) - # 准备最终返回的指标 final_metrics = { 'mse': metrics['mse'], 'rmse': metrics['rmse'], @@ -519,7 +402,8 @@ def train_product_model_with_transformer( 'r2': metrics['r2'], 'mape': metrics['mape'], 'training_time': training_time, - 'final_epoch': epochs + 'final_epoch': epochs, + 'version': final_version } - return model, final_metrics, epochs \ No newline at end of file + return model, final_metrics, epochs \ No newline at end of file diff --git a/server/utils/model_manager.py b/server/utils/model_manager.py index 7f50ce2..dc13298 100644 --- a/server/utils/model_manager.py +++ b/server/utils/model_manager.py @@ -8,6 +8,7 @@ import json import torch import glob from datetime import datetime +import re from typing import List, Dict, Optional, Tuple from core.config import DEFAULT_MODEL_DIR @@ -24,9 +25,30 @@ class ModelManager: if not os.path.exists(self.model_dir): os.makedirs(self.model_dir) - def generate_model_filename(self, - product_id: str, - model_type: str, + def _get_next_version(self, product_id: str, model_type: str, store_id: Optional[str] = None, training_mode: str = 'product') -> int: + """获取下一个模型版本号 (纯数字)""" + search_pattern = self.generate_model_filename( + product_id=product_id, + model_type=model_type, + version='v*', + store_id=store_id, + training_mode=training_mode + ) + + full_search_path = os.path.join(self.model_dir, search_pattern) + existing_files = glob.glob(full_search_path) + + max_version = 0 + for f in existing_files: + match = re.search(r'_v(\d+)\.pth$', os.path.basename(f)) + if match: + max_version = max(max_version, int(match.group(1))) + + return max_version + 1 + + def generate_model_filename(self, + product_id: str, + model_type: str, version: str, store_id: Optional[str] = None, training_mode: str = 'product', @@ -36,7 +58,7 @@ class ModelManager: 格式规范: - 产品模式: {model_type}_product_{product_id}_{version}.pth - - 店铺模式: {model_type}_store_{store_id}_{product_id}_{version}.pth + - 店铺模式: {model_type}_store_{store_id}_{product_id}_{version}.pth - 全局模式: {model_type}_global_{product_id}_{aggregation_method}_{version}.pth """ if training_mode == 'store' and store_id: @@ -47,33 +69,33 @@ class ModelManager: # 默认产品模式 return f"{model_type}_product_{product_id}_{version}.pth" - def save_model(self, + def save_model(self, model_data: dict, product_id: str, - model_type: str, - version: str, + model_type: str, store_id: Optional[str] = None, training_mode: str = 'product', aggregation_method: Optional[str] = None, - product_name: Optional[str] = None) -> str: + product_name: Optional[str] = None, + version: Optional[str] = None) -> Tuple[str, str]: """ - 保存模型到统一位置 + 保存模型到统一位置,并自动管理版本。 参数: - model_data: 包含模型状态和配置的字典 - product_id: 产品ID - model_type: 模型类型 - version: 版本号 - store_id: 店铺ID (可选) - training_mode: 训练模式 - aggregation_method: 聚合方法 (可选) - product_name: 产品名称 (可选) + ... + version: (可选) 如果提供,则覆盖自动版本控制 (如 'best')。 返回: - 模型文件路径 + (模型文件路径, 使用的版本号) """ + if version is None: + next_version_num = self._get_next_version(product_id, model_type, store_id, training_mode) + version_str = f"v{next_version_num}" + else: + version_str = version + filename = self.generate_model_filename( - product_id, model_type, version, store_id, training_mode, aggregation_method + product_id, model_type, version_str, store_id, training_mode, aggregation_method ) # 统一保存到根目录,避免复杂的子目录结构 @@ -86,7 +108,7 @@ class ModelManager: 'product_id': product_id, 'product_name': product_name or product_id, 'model_type': model_type, - 'version': version, + 'version': version_str, 'store_id': store_id, 'training_mode': training_mode, 'aggregation_method': aggregation_method, @@ -99,7 +121,7 @@ class ModelManager: torch.save(enhanced_model_data, model_path) print(f"模型已保存: {model_path}") - return model_path + return model_path, version_str def list_models(self, product_id: Optional[str] = None,