ShopTRAINING/server/utils/new_data_loader.py
2025-07-25 18:42:58 +08:00

86 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import os
def load_new_data(file_path='data/old_5shops_50skus.parquet'):
"""
加载并适配新的Parquet数据文件为现有系统提供兼容的数据格式。
核心原则:
1. 保证新数据的完整性,不丢弃任何原始特征。
2. 优先适配新数据,通过重命名和创建代理列来兼容旧代码。
参数:
file_path (str): 新数据文件的路径。
返回:
pandas.DataFrame: 经过适配处理的、包含所有原始特征的DataFrame。
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据文件不存在: {file_path}")
print(f"正在从 {file_path} 加载新数据...")
df = pd.read_parquet(file_path)
print("数据加载完成,开始进行适配处理...")
# 创建一个副本以进行修改保留原始df的纯净
df_adapted = df.copy()
# --- 1. 列名映射 (适配旧代码的命名习惯) ---
# 步骤1.1: 安全地删除新数据中多余的 'date' 列,以 'kdrq' 为准
if 'date' in df_adapted.columns and 'kdrq' in df.columns:
df_adapted.drop(columns=['date'], inplace=True)
print("已删除新数据中多余的 'date' 列,以 'kdrq' 为准。")
rename_map = {
'subbh': 'store_id',
'hh': 'product_id',
'kdrq': 'date', # 现在可以安全地将 kdrq 重命名为 date
'net_sales_quantity': 'sales', # 将目标变量映射为 'sales'
'temperature_2m_mean': 'temperature',
'day_of_week': 'weekday'
}
df_adapted.rename(columns=rename_map, inplace=True)
print(f"已完成列名映射: {list(rename_map.keys())} -> {list(rename_map.values())}")
# --- 2. 数据类型转换 ---
# 将 'date' 列转换为标准的datetime对象
df_adapted['date'] = pd.to_datetime(df_adapted['date'])
print("已将 'date' 列转换为 datetime 类型。")
# --- 3. 关键特征工程 (创建代理列) ---
# 现有模型依赖 'is_promotion' 和 'is_weekend' 特征。
# 'is_weekend' 在新数据中已存在,无需处理。
# 'is_promotion' 在新数据中不存在创建一个默认值为0的代理列。
if 'is_promotion' not in df_adapted.columns:
df_adapted['is_promotion'] = 0
print("创建了代理列 'is_promotion' 并填充默认值 0。")
# 确保 'month' 列存在,如果不存在则从日期中提取
if 'month' not in df_adapted.columns and 'date' in df_adapted.columns:
df_adapted['month'] = df_adapted['date'].dt.month
print("'date' 列中提取并创建了 'month' 列。")
print("数据适配处理完成。")
# 返回包含所有列的适配后DataFrame
return df_adapted
if __name__ == '__main__':
# 用于直接运行此脚本进行测试
print("--- 测试数据加载器 ---")
try:
adapted_df = load_new_data()
print("\n--- 适配后数据信息 ---")
adapted_df.info()
print("\n--- 检查关键列 ---")
key_cols = [
'store_id', 'product_id', 'date', 'sales',
'temperature', 'weekday', 'is_promotion', 'month'
]
print(adapted_df[key_cols].head())
print(f"\n测试成功适配后的DataFrame包含 {len(adapted_df.columns)} 列。")
except Exception as e:
print(f"\n测试失败: {e}")