ShopTRAINING/server/utils/new_data_loader.py

86 lines
3.3 KiB
Python
Raw Permalink Normal View History

2025-07-25 18:42:58 +08:00
import pandas as pd
import os
def load_new_data(file_path='data/old_5shops_50skus.parquet'):
"""
加载并适配新的Parquet数据文件为现有系统提供兼容的数据格式
核心原则:
1. 保证新数据的完整性不丢弃任何原始特征
2. 优先适配新数据通过重命名和创建代理列来兼容旧代码
参数:
file_path (str): 新数据文件的路径
返回:
pandas.DataFrame: 经过适配处理的包含所有原始特征的DataFrame
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据文件不存在: {file_path}")
print(f"正在从 {file_path} 加载新数据...")
df = pd.read_parquet(file_path)
print("数据加载完成,开始进行适配处理...")
# 创建一个副本以进行修改保留原始df的纯净
df_adapted = df.copy()
# --- 1. 列名映射 (适配旧代码的命名习惯) ---
# 步骤1.1: 安全地删除新数据中多余的 'date' 列,以 'kdrq' 为准
if 'date' in df_adapted.columns and 'kdrq' in df.columns:
df_adapted.drop(columns=['date'], inplace=True)
print("已删除新数据中多余的 'date' 列,以 'kdrq' 为准。")
rename_map = {
'subbh': 'store_id',
'hh': 'product_id',
'kdrq': 'date', # 现在可以安全地将 kdrq 重命名为 date
'net_sales_quantity': 'sales', # 将目标变量映射为 'sales'
'temperature_2m_mean': 'temperature',
'day_of_week': 'weekday'
}
df_adapted.rename(columns=rename_map, inplace=True)
print(f"已完成列名映射: {list(rename_map.keys())} -> {list(rename_map.values())}")
# --- 2. 数据类型转换 ---
# 将 'date' 列转换为标准的datetime对象
df_adapted['date'] = pd.to_datetime(df_adapted['date'])
print("已将 'date' 列转换为 datetime 类型。")
# --- 3. 关键特征工程 (创建代理列) ---
# 现有模型依赖 'is_promotion' 和 'is_weekend' 特征。
# 'is_weekend' 在新数据中已存在,无需处理。
# 'is_promotion' 在新数据中不存在创建一个默认值为0的代理列。
if 'is_promotion' not in df_adapted.columns:
df_adapted['is_promotion'] = 0
print("创建了代理列 'is_promotion' 并填充默认值 0。")
# 确保 'month' 列存在,如果不存在则从日期中提取
if 'month' not in df_adapted.columns and 'date' in df_adapted.columns:
df_adapted['month'] = df_adapted['date'].dt.month
print("'date' 列中提取并创建了 'month' 列。")
print("数据适配处理完成。")
# 返回包含所有列的适配后DataFrame
return df_adapted
if __name__ == '__main__':
# 用于直接运行此脚本进行测试
print("--- 测试数据加载器 ---")
try:
adapted_df = load_new_data()
print("\n--- 适配后数据信息 ---")
adapted_df.info()
print("\n--- 检查关键列 ---")
key_cols = [
'store_id', 'product_id', 'date', 'sales',
'temperature', 'weekday', 'is_promotion', 'month'
]
print(adapted_df[key_cols].head())
print(f"\n测试成功适配后的DataFrame包含 {len(adapted_df.columns)} 列。")
except Exception as e:
print(f"\n测试失败: {e}")