ShopTRAINING/server/analysis/trend_analysis.py

225 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
药店销售预测系统 - 趋势分析函数
"""
import numpy as np
import pandas as pd
from scipy import stats
def analyze_prediction_result(product_id, model_type, predictions, features=None):
"""
分析预测结果并提供洞察
参数:
product_id: 产品ID
model_type: 模型类型
predictions: 预测结果
features: 特征数据,可选
返回:
分析结果字典
"""
analysis = {}
# 分析趋势
analysis['trend'] = analyze_trend(predictions)
# 计算统计数据
analysis['statistics'] = calculate_prediction_statistics(predictions)
# 计算日环比变化
analysis['day_over_day'] = calculate_day_over_day_changes(predictions)
# 如果提供了特征数据,分析影响因素
if features is not None:
analysis['influencing_factors'] = analyze_influencing_factors(product_id, model_type, predictions, features)
# 生成解释文本
analysis['explanation'] = generate_prediction_explanation(analysis, product_id, model_type)
return analysis
def analyze_trend(predictions):
"""
分析预测结果的趋势
参数:
predictions: 预测结果数组
返回:
趋势分析结果字典
"""
# 计算线性回归斜率
x = np.arange(len(predictions))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, predictions)
# 判断趋势类型
if slope > 0.05:
trend_type = "上升"
elif slope < -0.05:
trend_type = "下降"
else:
trend_type = "平稳"
# 计算波动性
volatility = np.std(predictions) / np.mean(predictions) if np.mean(predictions) > 0 else 0
# 判断波动程度
if volatility < 0.1:
volatility_level = ""
elif volatility < 0.3:
volatility_level = ""
else:
volatility_level = ""
return {
'slope': slope,
'trend_type': trend_type,
'r_squared': r_value ** 2,
'p_value': p_value,
'volatility': volatility,
'volatility_level': volatility_level
}
def calculate_prediction_statistics(predictions):
"""
计算预测结果的统计数据
参数:
predictions: 预测结果数组
返回:
统计数据字典
"""
return {
'mean': np.mean(predictions),
'median': np.median(predictions),
'min': np.min(predictions),
'max': np.max(predictions),
'std': np.std(predictions),
'q1': np.percentile(predictions, 25),
'q3': np.percentile(predictions, 75)
}
def calculate_day_over_day_changes(predictions):
"""
计算日环比变化
参数:
predictions: 预测结果数组
返回:
日环比变化列表
"""
changes = []
for i in range(1, len(predictions)):
if predictions[i-1] != 0:
pct_change = (predictions[i] - predictions[i-1]) / predictions[i-1] * 100
else:
pct_change = float('inf') if predictions[i] > 0 else 0
changes.append(pct_change)
return changes
def analyze_influencing_factors(product_id, model_type, predictions, features):
"""
分析影响预测结果的因素
参数:
product_id: 产品ID
model_type: 模型类型
predictions: 预测结果数组
features: 特征数据
返回:
影响因素分析结果字典
"""
# 这里简单地返回一些基本信息,实际应用中可以进行更复杂的分析
return {
'product_id': product_id,
'model_type': model_type,
'feature_count': features.shape[1] if hasattr(features, 'shape') else len(features),
'important_features': ['温度', '促销', '季节性'] if product_id in ['P001', 'P004'] else ['价格', '周末', '节假日']
}
def generate_prediction_explanation(analysis, product_id, model_type):
"""
根据分析结果生成解释文本
参数:
analysis: 分析结果字典
product_id: 产品ID
model_type: 模型类型
返回:
解释文本
"""
trend = analysis['trend']
stats = analysis['statistics']
# 生成趋势解释
trend_text = f"预测显示销量整体呈{trend['trend_type']}趋势,"
if trend['trend_type'] == "上升":
trend_text += f"平均每天增长约{abs(trend['slope']):.2f}个单位。"
elif trend['trend_type'] == "下降":
trend_text += f"平均每天下降约{abs(trend['slope']):.2f}个单位。"
else:
trend_text += "销量基本保持稳定。"
# 生成波动性解释
volatility_text = f"预测期内销量波动性{trend['volatility_level']}"
if trend['volatility_level'] == "":
volatility_text += "表明销量可能受到多种因素的影响,预测的不确定性较高。"
elif trend['volatility_level'] == "":
volatility_text += "表明销量有一定波动,但整体可控。"
else:
volatility_text += "表明销量相对稳定,预测可信度较高。"
# 生成统计数据解释
stats_text = f"预测期内平均日销量为{stats['mean']:.2f}个单位,"
stats_text += f"最高日销量为{stats['max']:.2f}个单位,最低日销量为{stats['min']:.2f}个单位。"
# 组合解释文本
explanation = f"{model_type}模型对产品{product_id}的预测分析:\n"
explanation += trend_text + "\n"
explanation += volatility_text + "\n"
explanation += stats_text
# 如果有影响因素分析,添加到解释中
if 'influencing_factors' in analysis and analysis['influencing_factors'] is not None:
factors = analysis['influencing_factors']
if 'important_features' in factors:
explanation += f"\n\n主要影响因素包括:{', '.join(factors['important_features'])}"
return explanation
def compare_with_historical(predictions, product_df):
"""
将预测结果与历史数据进行比较
参数:
predictions: 预测结果数组
product_df: 产品历史数据DataFrame
返回:
比较结果字典
"""
historical_sales = product_df['sales'].values[-len(predictions):] if len(product_df) >= len(predictions) else product_df['sales'].values
# 计算平均销量差异
if len(historical_sales) > 0:
avg_historical = np.mean(historical_sales)
avg_predicted = np.mean(predictions)
avg_diff = avg_predicted - avg_historical
avg_pct_diff = (avg_diff / avg_historical) * 100 if avg_historical != 0 else float('inf')
else:
avg_diff = 0
avg_pct_diff = 0
return {
'avg_historical': avg_historical if len(historical_sales) > 0 else None,
'avg_predicted': avg_predicted,
'avg_diff': avg_diff,
'avg_pct_diff': avg_pct_diff
}