ShopTRAINING/server/utils/multi_store_data_utils.py

439 lines
16 KiB
Python
Raw Normal View History

2025-07-02 11:05:23 +08:00
"""
多店铺销售预测系统 - 数据处理工具函数
支持多店铺数据的加载过滤和处理
"""
import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
from typing import Optional, List, Tuple, Dict, Any
def load_multi_store_data(file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet',
2025-07-02 11:05:23 +08:00
store_id: Optional[str] = None,
product_id: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""
加载多店铺销售数据支持按店铺产品时间范围过滤
参数:
file_path: 数据文件路径 (支持 .csv, .xlsx, .parquet)
2025-07-02 11:05:23 +08:00
store_id: 店铺ID为None时返回所有店铺数据
product_id: 产品ID为None时返回所有产品数据
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
返回:
DataFrame: 过滤后的销售数据
"""
# 尝试多个可能的文件路径
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 假设项目根目录是 server/utils 的上两级目录
project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))
2025-07-02 11:05:23 +08:00
possible_paths = [
file_path, # 相对路径 (如果从根目录运行)
os.path.join(project_root, file_path), # 基于项目根目录的绝对路径
os.path.join('..', file_path), # 相对路径 (如果从 server 目录运行)
os.path.join('server', file_path) # 相对路径 (如果从根目录运行,但路径错误)
2025-07-02 11:05:23 +08:00
]
df = None
loaded_path = None
2025-07-02 11:05:23 +08:00
for path in possible_paths:
try:
if not os.path.exists(path):
continue
2025-07-02 11:05:23 +08:00
if path.endswith('.csv'):
df = pd.read_csv(path)
elif path.endswith('.xlsx'):
df = pd.read_excel(path)
elif path.endswith('.parquet'):
df = pd.read_parquet(path)
2025-07-02 11:05:23 +08:00
if df is not None:
loaded_path = path
print(f"成功加载数据文件: {loaded_path}")
2025-07-02 11:05:23 +08:00
break
except Exception as e:
print(f"加载文件 {path} 失败: {e}")
2025-07-02 11:05:23 +08:00
continue
if df is None:
raise FileNotFoundError(f"无法找到或加载数据文件,尝试的路径: {possible_paths}")
2025-07-02 11:05:23 +08:00
# 按店铺过滤
if store_id:
df = df[df['store_id'] == store_id].copy()
print(f"按店铺过滤: {store_id}, 剩余记录数: {len(df)}")
# 按产品过滤
if product_id:
df = df[df['product_id'] == product_id].copy()
print(f"按产品过滤: {product_id}, 剩余记录数: {len(df)}")
# 标准化列名和数据类型
df = standardize_column_names(df)
# 在标准化之后进行时间范围过滤
2025-07-02 11:05:23 +08:00
if start_date:
try:
start_date_dt = pd.to_datetime(start_date)
# 确保比较是在datetime对象之间
if 'date' in df.columns:
df = df[df['date'] >= start_date_dt].copy()
print(f"开始日期过滤: {start_date_dt}, 剩余记录数: {len(df)}")
except (ValueError, TypeError):
print(f"警告: 无效的开始日期格式 '{start_date}',已忽略。")
2025-07-02 11:05:23 +08:00
if end_date:
try:
end_date_dt = pd.to_datetime(end_date)
# 确保比较是在datetime对象之间
if 'date' in df.columns:
df = df[df['date'] <= end_date_dt].copy()
print(f"结束日期过滤: {end_date_dt}, 剩余记录数: {len(df)}")
except (ValueError, TypeError):
print(f"警告: 无效的结束日期格式 '{end_date}',已忽略。")
2025-07-02 11:05:23 +08:00
if len(df) == 0:
print("警告: 过滤后没有数据")
return df
def standardize_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""
标准化列名以匹配训练代码和API期望的格式
2025-07-02 11:05:23 +08:00
参数:
df: 原始DataFrame
返回:
DataFrame: 标准化列名后的DataFrame
"""
df = df.copy()
# 定义列名映射并强制重命名
rename_map = {
'sales_quantity': 'sales', # 修复:匹配原始列名
'temperature_2m_mean': 'temperature', # 新增:处理温度列
'dayofweek': 'weekday' # 修复:匹配原始列名
2025-07-02 11:05:23 +08:00
}
df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns}, inplace=True)
2025-07-02 11:05:23 +08:00
# 确保date列是datetime类型
2025-07-02 11:05:23 +08:00
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df.dropna(subset=['date'], inplace=True) # 移除无法解析的日期行
else:
# 如果没有date列无法继续返回空DataFrame
return pd.DataFrame()
# 计算 sales_amount
# 由于没有price列sales_amount的计算逻辑需要调整或移除
# 这里我们注释掉它因为原始数据中已有sales_amount
# if 'sales_amount' not in df.columns and 'sales' in df.columns and 'price' in df.columns:
# # 先确保sales和price是数字
# df['sales'] = pd.to_numeric(df['sales'], errors='coerce')
# df['price'] = pd.to_numeric(df['price'], errors='coerce')
# df['sales_amount'] = df['sales'] * df['price']
# 创建缺失的特征列
if 'weekday' not in df.columns:
df['weekday'] = df['date'].dt.dayofweek
if 'month' not in df.columns:
df['month'] = df['date'].dt.month
# 添加缺失的元数据列
meta_columns = {
'store_name': 'Unknown Store',
'store_location': 'Unknown Location',
'store_type': 'Unknown',
'product_name': 'Unknown Product',
'product_category': 'Unknown Category'
}
for col, default in meta_columns.items():
if col not in df.columns:
df[col] = default
# 添加缺失的布尔特征列
2025-07-02 11:05:23 +08:00
default_features = {
'is_holiday': False,
'is_weekend': None,
'is_promotion': False,
'temperature': 20.0
2025-07-02 11:05:23 +08:00
}
for feature, default_value in default_features.items():
if feature not in df.columns:
if feature == 'is_weekend':
2025-07-02 11:05:23 +08:00
df['is_weekend'] = df['weekday'].isin([5, 6])
else:
df[feature] = default_value
# 确保数值类型正确
numeric_columns = ['sales', 'sales_amount', 'weekday', 'month', 'temperature']
2025-07-02 11:05:23 +08:00
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保布尔类型正确
boolean_columns = ['is_holiday', 'is_weekend', 'is_promotion']
for col in boolean_columns:
if col in df.columns:
df[col] = df[col].astype(bool)
print(f"数据标准化完成,可用特征列: {[col for col in ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] if col in df.columns]}")
2025-07-02 11:05:23 +08:00
return df
def get_available_stores(file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet') -> List[Dict[str, Any]]:
2025-07-02 11:05:23 +08:00
"""
获取可用的店铺列表
参数:
file_path: 数据文件路径
返回:
List[Dict]: 店铺信息列表
"""
try:
df = load_multi_store_data(file_path)
if 'store_id' not in df.columns:
print("数据文件中缺少 'store_id'")
return []
# 智能地获取店铺信息,即使某些列缺失
store_info = []
2025-07-02 11:05:23 +08:00
# 使用drop_duplicates获取唯一的店铺组合
stores_df = df.drop_duplicates(subset=['store_id'])
for _, row in stores_df.iterrows():
store_info.append({
'store_id': row['store_id'],
'store_name': row.get('store_name', f"店铺 {row['store_id']}"),
'location': row.get('store_location', '未知位置'),
'type': row.get('store_type', '标准'),
'opening_date': row.get('opening_date', '未知'),
})
return store_info
2025-07-02 11:05:23 +08:00
except Exception as e:
print(f"获取店铺列表失败: {e}")
return []
def get_available_products(file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet',
2025-07-02 11:05:23 +08:00
store_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取可用的产品列表
参数:
file_path: 数据文件路径
store_id: 店铺ID为None时返回所有产品
返回:
List[Dict]: 产品信息列表
"""
try:
df = load_multi_store_data(file_path, store_id=store_id)
# 获取唯一产品信息
product_columns = ['product_id', 'product_name']
if 'product_category' in df.columns:
product_columns.append('product_category')
if 'unit_price' in df.columns:
product_columns.append('unit_price')
products = df[product_columns].drop_duplicates()
return products.to_dict('records')
except Exception as e:
print(f"获取产品列表失败: {e}")
return []
def get_store_product_sales_data(store_id: str,
product_id: str,
file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet') -> pd.DataFrame:
2025-07-02 11:05:23 +08:00
"""
获取特定店铺和产品的销售数据用于模型训练
参数:
file_path: 数据文件路径
store_id: 店铺ID
product_id: 产品ID
返回:
DataFrame: 处理后的销售数据包含模型需要的特征
"""
# 加载数据
df = load_multi_store_data(file_path, store_id=store_id, product_id=product_id)
if len(df) == 0:
raise ValueError(f"没有找到店铺 {store_id} 产品 {product_id} 的销售数据")
# 确保数据按日期排序
df = df.sort_values('date').copy()
# 数据标准化已在load_multi_store_data中完成
# 验证必要的列是否存在
required_columns = ['sales', 'price', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"警告: 数据标准化后仍缺少列 {missing_columns}")
raise ValueError(f"无法获取完整的特征数据,缺少列: {missing_columns}")
# 定义模型训练所需的所有列(特征 + 目标)
final_columns = [
'date', 'sales', 'product_id', 'product_name', 'store_id', 'store_name',
'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'
]
# 筛选出DataFrame中实际存在的列
existing_columns = [col for col in final_columns if col in df.columns]
# 返回只包含这些必需列的DataFrame
return df[existing_columns]
2025-07-02 11:05:23 +08:00
def aggregate_multi_store_data(product_id: Optional[str] = None,
store_id: Optional[str] = None,
aggregation_method: str = 'sum',
file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet') -> pd.DataFrame:
2025-07-02 11:05:23 +08:00
"""
聚合销售数据可按产品全局或按店铺所有产品
2025-07-02 11:05:23 +08:00
参数:
file_path: 数据文件路径
product_id: 产品ID (用于全局模型)
store_id: 店铺ID (用于店铺聚合模型)
2025-07-02 11:05:23 +08:00
aggregation_method: 聚合方法 ('sum', 'mean', 'median')
返回:
DataFrame: 聚合后的销售数据
"""
# 根据是全局聚合、店铺聚合还是真正全局聚合来加载数据
if store_id:
# 店铺聚合:加载该店铺的所有数据
df = load_multi_store_data(file_path, store_id=store_id)
if len(df) == 0:
raise ValueError(f"没有找到店铺 {store_id} 的销售数据")
grouping_entity = f"店铺 {store_id}"
elif product_id:
# 按产品聚合:加载该产品在所有店铺的数据
df = load_multi_store_data(file_path, product_id=product_id)
if len(df) == 0:
raise ValueError(f"没有找到产品 {product_id} 的销售数据")
grouping_entity = f"产品 {product_id}"
else:
# 真正全局聚合:加载所有数据
df = load_multi_store_data(file_path)
if len(df) == 0:
raise ValueError("数据文件为空,无法进行全局聚合")
grouping_entity = "所有产品"
2025-07-02 11:05:23 +08:00
# 按日期聚合(使用标准化后的列名)
agg_dict = {}
if aggregation_method == 'sum':
agg_dict = {
'sales': 'sum', # 标准化后的销量列
'sales_amount': 'sum',
'price': 'mean' # 标准化后的价格列,取平均值
}
elif aggregation_method == 'mean':
agg_dict = {
'sales': 'mean',
'sales_amount': 'mean',
'price': 'mean'
}
elif aggregation_method == 'median':
agg_dict = {
'sales': 'median',
'sales_amount': 'median',
'price': 'median'
}
# 确保列名存在
available_cols = df.columns.tolist()
agg_dict = {k: v for k, v in agg_dict.items() if k in available_cols}
# 聚合数据
aggregated_df = df.groupby('date').agg(agg_dict).reset_index()
# 获取产品信息(取第一个店铺的信息)
product_info = df[['product_id', 'product_name', 'product_category']].iloc[0]
for col, val in product_info.items():
aggregated_df[col] = val
# 添加店铺信息标识为全局
aggregated_df['store_id'] = 'GLOBAL'
aggregated_df['store_name'] = f'全部店铺-{aggregation_method.upper()}'
aggregated_df['store_location'] = '全局聚合'
aggregated_df['store_type'] = 'global'
# 对聚合后的数据进行标准化(添加缺失的特征列)
aggregated_df = aggregated_df.sort_values('date').copy()
aggregated_df = standardize_column_names(aggregated_df)
# 定义模型训练所需的所有列(特征 + 目标)
final_columns = [
'date', 'sales', 'product_id', 'product_name', 'store_id', 'store_name',
'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'
]
# 筛选出DataFrame中实际存在的列
existing_columns = [col for col in final_columns if col in aggregated_df.columns]
# 返回只包含这些必需列的DataFrame
return aggregated_df[existing_columns]
2025-07-02 11:05:23 +08:00
def get_sales_statistics(file_path: str = 'data/timeseries_training_data_sample_10s50p.parquet',
2025-07-02 11:05:23 +08:00
store_id: Optional[str] = None,
product_id: Optional[str] = None) -> Dict[str, Any]:
"""
获取销售数据统计信息
参数:
file_path: 数据文件路径
store_id: 店铺ID
product_id: 产品ID
返回:
Dict: 统计信息
"""
try:
df = load_multi_store_data(file_path, store_id=store_id, product_id=product_id)
if len(df) == 0:
return {'error': '没有数据'}
stats = {
'total_records': len(df),
'date_range': {
'start': df['date'].min().strftime('%Y-%m-%d'),
'end': df['date'].max().strftime('%Y-%m-%d')
},
'stores': df['store_id'].nunique(),
'products': df['product_id'].nunique(),
'total_sales_amount': float(df['sales_amount'].sum()) if 'sales_amount' in df.columns else 0,
'total_quantity': int(df['quantity_sold'].sum()) if 'quantity_sold' in df.columns else 0,
'avg_daily_sales': float(df.groupby('date')['quantity_sold'].sum().mean()) if 'quantity_sold' in df.columns else 0
}
return stats
except Exception as e:
return {'error': str(e)}
# 向后兼容的函数
def load_data(file_path='pharmacy_sales.xlsx', store_id=None):
"""
向后兼容的数据加载函数
"""
return load_multi_store_data(file_path, store_id=store_id)