ShopTRAINING/server/utils/multi_store_data_utils.py
xz2000 b1b697117b **日期**: 2025-07-14
**主题**: UI导航栏重构

### 描述
根据用户请求,对左侧功能导航栏进行了调整。

### 主要改动
1.  **删除“数据管理”**:
    *   从 `UI/src/App.vue` 的导航菜单中移除了“数据管理”项。
    *   从 `UI/src/router/index.js` 中删除了对应的 `/data` 路由。
    *   删除了视图文件 `UI/src/views/DataView.vue`。

2.  **提升“店铺管理”**:
    *   将“店铺管理”菜单项在 `UI/src/App.vue` 中的位置提升,以填补原“数据管理”的位置,使其在导航中更加突出。

### 涉及文件
*   `UI/src/App.vue`
*   `UI/src/router/index.js`
*   `UI/src/views/DataView.vue` (已删除)

**按药品模型预测**
---
**日期**: 2025-07-14
**主题**: 修复导航菜单高亮问题

### 描述
修复了首次进入或刷新页面时,左侧导航菜单项与当前路由不匹配导致不高亮的问题。

### 主要改动
*   **文件**: `UI/src/App.vue`
*   **修改**:
    1.  引入 `useRoute` 和 `computed`。
    2.  创建了一个计算属性 `activeMenu`,其值动态地等于当前路由的路径 (`route.path`)。
    3.  将 `el-menu` 组件的 `:default-active` 属性绑定到 `activeMenu`。

### 结果
确保了导航菜单的高亮状态始终与当前页面的URL保持同步。

---
**日期**: 2025-07-15
**主题**: 修复硬编码文件路径问题,提高项目可移植性

### 问题描述
项目在从一台计算机迁移到另一台时,由于数据文件路径被硬编码在代码中,导致程序无法找到数据文件而运行失败。

### 根本原因
多个Python文件(`predictor.py`, `multi_store_data_utils.py`)中直接写入了相对路径 `'data/timeseries_training_data_sample_10s50p.parquet'` 作为默认值。这种方式在不同运行环境下(如从根目录运行 vs 从子目录运行)会产生路径解析错误。

### 解决方案:集中配置,统一管理
1.  **修改 `server/core/config.py` (核心)**:
    *   动态计算并定义了一个全局变量 `PROJECT_ROOT`,它始终指向项目的根目录。
    *   基于 `PROJECT_ROOT`,使用 `os.path.join` 创建了一个跨平台的、绝对的默认数据路径 `DEFAULT_DATA_PATH` 和模型保存路径 `DEFAULT_MODEL_DIR`。
    *   这确保了无论从哪个位置执行代码,路径总能被正确解析。

2.  **修改 `server/utils/multi_store_data_utils.py`**:
    *   从 `server/core/config` 导入 `DEFAULT_DATA_PATH`。
    *   将所有数据加载函数的 `file_path` 参数的默认值从硬编码的字符串改为 `None`。
    *   在函数内部,如果 `file_path` 为 `None`,则自动使用导入的 `DEFAULT_DATA_PATH`。
    *   移除了原有的、复杂的、为了猜测正确路径而编写的冗余代码。

3.  **修改 `server/core/predictor.py`**:
    *   同样从 `server/core/config` 导入 `DEFAULT_DATA_PATH`。
    *   在初始化 `PharmacyPredictor` 时,如果未提供数据路径,则使用导入的 `DEFAULT_DATA_PATH` 作为默认值。

### 最终结果
通过将数据源路径集中到唯一的配置文件中进行管理,彻底解决了因硬编码路径导致的可移植性问题。项目现在可以在任何环境下可靠地运行。

---
### 未来如何修改数据源(例如,连接到服务器数据库)

本次重构为将来更换数据源打下了坚实的基础。操作非常简单:

1.  **定位配置文件**: 打开 `server/core/config.py` 文件。

2.  **修改数据源定义**:
    *   **当前 (文件)**:
        ```python
        DEFAULT_DATA_PATH = os.path.join(PROJECT_ROOT, 'data', 'timeseries_training_data_sample_10s50p.parquet')
        ```
    *   **未来 (数据库示例)**:
        您可以将这行替换为数据库连接字符串,或者添加新的数据库配置变量。例如:
        ```python
        # 注释掉或删除旧的文件路径配置
        # DEFAULT_DATA_PATH = ...

        # 新增数据库连接配置
        DATABASE_URL = "postgresql://user:password@your_server_ip:5432/your_database_name"
        ```

3.  **修改数据加载逻辑**:
    *   **定位数据加载函数**: 打开 `server/utils/multi_store_data_utils.py`。
    *   **修改 `load_multi_store_data` 函数**:
        *   引入数据库连接库(如 `sqlalchemy` 或 `psycopg2`)。
        *   修改函数逻辑,使其使用 `config.py` 中的 `DATABASE_URL` 来连接数据库,并执行SQL查询来获取数据,而不是读取文件。
        *   **示例**:
            ```python
            from sqlalchemy import create_engine
            from core.config import DATABASE_URL # 导入新的数据库配置

            def load_multi_store_data(...):
                # ...
                engine = create_engine(DATABASE_URL)
                query = "SELECT * FROM sales_data" # 根据需要构建查询
                df = pd.read_sql(query, engine)
                # ... 后续处理逻辑保持不变 ...
            ```
2025-07-15 10:37:33 +08:00

424 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
多店铺销售预测系统 - 数据处理工具函数
支持多店铺数据的加载、过滤和处理
"""
import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
from typing import Optional, List, Tuple, Dict, Any
from core.config import DEFAULT_DATA_PATH
def load_multi_store_data(file_path: str = None,
store_id: Optional[str] = None,
product_id: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""
加载多店铺销售数据,支持按店铺、产品、时间范围过滤
参数:
file_path: 数据文件路径 (支持 .csv, .xlsx, .parquet)。如果为None则使用config中定义的默认路径。
store_id: 店铺ID为None时返回所有店铺数据
product_id: 产品ID为None时返回所有产品数据
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
返回:
DataFrame: 过滤后的销售数据
"""
# 如果未提供文件路径,则使用配置文件中的默认路径
if file_path is None:
file_path = DEFAULT_DATA_PATH
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据文件不存在: {file_path}")
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith('.xlsx'):
df = pd.read_excel(file_path)
elif file_path.endswith('.parquet'):
df = pd.read_parquet(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_path}")
print(f"成功加载数据文件: {file_path}")
except Exception as e:
print(f"加载文件 {file_path} 失败: {e}")
raise
# 按店铺过滤
if store_id:
df = df[df['store_id'] == store_id].copy()
print(f"按店铺过滤: {store_id}, 剩余记录数: {len(df)}")
# 按产品过滤
if product_id:
df = df[df['product_id'] == product_id].copy()
print(f"按产品过滤: {product_id}, 剩余记录数: {len(df)}")
# 标准化列名和数据类型
df = standardize_column_names(df)
# 在标准化之后进行时间范围过滤
if start_date:
try:
start_date_dt = pd.to_datetime(start_date)
# 确保比较是在datetime对象之间
if 'date' in df.columns:
df = df[df['date'] >= start_date_dt].copy()
print(f"开始日期过滤: {start_date_dt}, 剩余记录数: {len(df)}")
except (ValueError, TypeError):
print(f"警告: 无效的开始日期格式 '{start_date}',已忽略。")
if end_date:
try:
end_date_dt = pd.to_datetime(end_date)
# 确保比较是在datetime对象之间
if 'date' in df.columns:
df = df[df['date'] <= end_date_dt].copy()
print(f"结束日期过滤: {end_date_dt}, 剩余记录数: {len(df)}")
except (ValueError, TypeError):
print(f"警告: 无效的结束日期格式 '{end_date}',已忽略。")
if len(df) == 0:
print("警告: 过滤后没有数据")
return df
def standardize_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""
标准化列名以匹配训练代码和API期望的格式
参数:
df: 原始DataFrame
返回:
DataFrame: 标准化列名后的DataFrame
"""
df = df.copy()
# 定义列名映射并强制重命名
rename_map = {
'sales_quantity': 'sales', # 修复:匹配原始列名
'temperature_2m_mean': 'temperature', # 新增:处理温度列
'dayofweek': 'weekday' # 修复:匹配原始列名
}
df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns}, inplace=True)
# 确保date列是datetime类型
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df.dropna(subset=['date'], inplace=True) # 移除无法解析的日期行
else:
# 如果没有date列无法继续返回空DataFrame
return pd.DataFrame()
# 计算 sales_amount
# 由于没有price列sales_amount的计算逻辑需要调整或移除
# 这里我们注释掉它因为原始数据中已有sales_amount
# if 'sales_amount' not in df.columns and 'sales' in df.columns and 'price' in df.columns:
# # 先确保sales和price是数字
# df['sales'] = pd.to_numeric(df['sales'], errors='coerce')
# df['price'] = pd.to_numeric(df['price'], errors='coerce')
# df['sales_amount'] = df['sales'] * df['price']
# 创建缺失的特征列
if 'weekday' not in df.columns:
df['weekday'] = df['date'].dt.dayofweek
if 'month' not in df.columns:
df['month'] = df['date'].dt.month
# 添加缺失的元数据列
meta_columns = {
'store_name': 'Unknown Store',
'store_location': 'Unknown Location',
'store_type': 'Unknown',
'product_name': 'Unknown Product',
'product_category': 'Unknown Category'
}
for col, default in meta_columns.items():
if col not in df.columns:
df[col] = default
# 添加缺失的布尔特征列
default_features = {
'is_holiday': False,
'is_weekend': None,
'is_promotion': False,
'temperature': 20.0
}
for feature, default_value in default_features.items():
if feature not in df.columns:
if feature == 'is_weekend':
df['is_weekend'] = df['weekday'].isin([5, 6])
else:
df[feature] = default_value
# 确保数值类型正确
numeric_columns = ['sales', 'sales_amount', 'weekday', 'month', 'temperature']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保布尔类型正确
boolean_columns = ['is_holiday', 'is_weekend', 'is_promotion']
for col in boolean_columns:
if col in df.columns:
df[col] = df[col].astype(bool)
print(f"数据标准化完成,可用特征列: {[col for col in ['sales', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'] if col in df.columns]}")
return df
def get_available_stores(file_path: str = None) -> List[Dict[str, Any]]:
"""
获取可用的店铺列表
参数:
file_path: 数据文件路径
返回:
List[Dict]: 店铺信息列表
"""
try:
df = load_multi_store_data(file_path)
if 'store_id' not in df.columns:
print("数据文件中缺少 'store_id'")
return []
# 智能地获取店铺信息,即使某些列缺失
store_info = []
# 使用drop_duplicates获取唯一的店铺组合
stores_df = df.drop_duplicates(subset=['store_id'])
for _, row in stores_df.iterrows():
store_info.append({
'store_id': row['store_id'],
'store_name': row.get('store_name', f"店铺 {row['store_id']}"),
'location': row.get('store_location', '未知位置'),
'type': row.get('store_type', '标准'),
'opening_date': row.get('opening_date', '未知'),
})
return store_info
except Exception as e:
print(f"获取店铺列表失败: {e}")
return []
def get_available_products(file_path: str = None,
store_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取可用的产品列表
参数:
file_path: 数据文件路径
store_id: 店铺ID为None时返回所有产品
返回:
List[Dict]: 产品信息列表
"""
try:
df = load_multi_store_data(file_path, store_id=store_id)
# 获取唯一产品信息
product_columns = ['product_id', 'product_name']
if 'product_category' in df.columns:
product_columns.append('product_category')
if 'unit_price' in df.columns:
product_columns.append('unit_price')
products = df[product_columns].drop_duplicates()
return products.to_dict('records')
except Exception as e:
print(f"获取产品列表失败: {e}")
return []
def get_store_product_sales_data(store_id: str,
product_id: str,
file_path: str = None) -> pd.DataFrame:
"""
获取特定店铺和产品的销售数据,用于模型训练
参数:
file_path: 数据文件路径
store_id: 店铺ID
product_id: 产品ID
返回:
DataFrame: 处理后的销售数据,包含模型需要的特征
"""
# 加载数据
df = load_multi_store_data(file_path, store_id=store_id, product_id=product_id)
if len(df) == 0:
raise ValueError(f"没有找到店铺 {store_id} 产品 {product_id} 的销售数据")
# 确保数据按日期排序
df = df.sort_values('date').copy()
# 数据标准化已在load_multi_store_data中完成
# 验证必要的列是否存在
required_columns = ['sales', 'price', 'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"警告: 数据标准化后仍缺少列 {missing_columns}")
raise ValueError(f"无法获取完整的特征数据,缺少列: {missing_columns}")
# 定义模型训练所需的所有列(特征 + 目标)
final_columns = [
'date', 'sales', 'product_id', 'product_name', 'store_id', 'store_name',
'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'
]
# 筛选出DataFrame中实际存在的列
existing_columns = [col for col in final_columns if col in df.columns]
# 返回只包含这些必需列的DataFrame
return df[existing_columns]
def aggregate_multi_store_data(product_id: Optional[str] = None,
store_id: Optional[str] = None,
aggregation_method: str = 'sum',
file_path: str = None) -> pd.DataFrame:
"""
聚合销售数据,可按产品(全局)或按店铺(所有产品)
参数:
file_path: 数据文件路径
product_id: 产品ID (用于全局模型)
store_id: 店铺ID (用于店铺聚合模型)
aggregation_method: 聚合方法 ('sum', 'mean', 'median')
返回:
DataFrame: 聚合后的销售数据
"""
# 根据是全局聚合、店铺聚合还是真正全局聚合来加载数据
if store_id:
# 店铺聚合:加载该店铺的所有数据
df = load_multi_store_data(file_path, store_id=store_id)
if len(df) == 0:
raise ValueError(f"没有找到店铺 {store_id} 的销售数据")
grouping_entity = f"店铺 {store_id}"
elif product_id:
# 按产品聚合:加载该产品在所有店铺的数据
df = load_multi_store_data(file_path, product_id=product_id)
if len(df) == 0:
raise ValueError(f"没有找到产品 {product_id} 的销售数据")
grouping_entity = f"产品 {product_id}"
else:
# 真正全局聚合:加载所有数据
df = load_multi_store_data(file_path)
if len(df) == 0:
raise ValueError("数据文件为空,无法进行全局聚合")
grouping_entity = "所有产品"
# 按日期聚合(使用标准化后的列名)
agg_dict = {}
if aggregation_method == 'sum':
agg_dict = {
'sales': 'sum', # 标准化后的销量列
'sales_amount': 'sum',
'price': 'mean' # 标准化后的价格列,取平均值
}
elif aggregation_method == 'mean':
agg_dict = {
'sales': 'mean',
'sales_amount': 'mean',
'price': 'mean'
}
elif aggregation_method == 'median':
agg_dict = {
'sales': 'median',
'sales_amount': 'median',
'price': 'median'
}
# 确保列名存在
available_cols = df.columns.tolist()
agg_dict = {k: v for k, v in agg_dict.items() if k in available_cols}
# 聚合数据
aggregated_df = df.groupby('date').agg(agg_dict).reset_index()
# 获取产品信息(取第一个店铺的信息)
product_info = df[['product_id', 'product_name', 'product_category']].iloc[0]
for col, val in product_info.items():
aggregated_df[col] = val
# 添加店铺信息标识为全局
aggregated_df['store_id'] = 'GLOBAL'
aggregated_df['store_name'] = f'全部店铺-{aggregation_method.upper()}'
aggregated_df['store_location'] = '全局聚合'
aggregated_df['store_type'] = 'global'
# 对聚合后的数据进行标准化(添加缺失的特征列)
aggregated_df = aggregated_df.sort_values('date').copy()
aggregated_df = standardize_column_names(aggregated_df)
# 定义模型训练所需的所有列(特征 + 目标)
final_columns = [
'date', 'sales', 'product_id', 'product_name', 'store_id', 'store_name',
'weekday', 'month', 'is_holiday', 'is_weekend', 'is_promotion', 'temperature'
]
# 筛选出DataFrame中实际存在的列
existing_columns = [col for col in final_columns if col in aggregated_df.columns]
# 返回只包含这些必需列的DataFrame
return aggregated_df[existing_columns]
def get_sales_statistics(file_path: str = None,
store_id: Optional[str] = None,
product_id: Optional[str] = None) -> Dict[str, Any]:
"""
获取销售数据统计信息
参数:
file_path: 数据文件路径
store_id: 店铺ID
product_id: 产品ID
返回:
Dict: 统计信息
"""
try:
df = load_multi_store_data(file_path, store_id=store_id, product_id=product_id)
if len(df) == 0:
return {'error': '没有数据'}
stats = {
'total_records': len(df),
'date_range': {
'start': df['date'].min().strftime('%Y-%m-%d'),
'end': df['date'].max().strftime('%Y-%m-%d')
},
'stores': df['store_id'].nunique(),
'products': df['product_id'].nunique(),
'total_sales_amount': float(df['sales_amount'].sum()) if 'sales_amount' in df.columns else 0,
'total_quantity': int(df['quantity_sold'].sum()) if 'quantity_sold' in df.columns else 0,
'avg_daily_sales': float(df.groupby('date')['quantity_sold'].sum().mean()) if 'quantity_sold' in df.columns else 0
}
return stats
except Exception as e:
return {'error': str(e)}
# 向后兼容的函数
def load_data(file_path=None, store_id=None):
"""
向后兼容的数据加载函数
"""
return load_multi_store_data(file_path, store_id=store_id)