20 KiB
20 KiB
药店单品销售预测系统 - 模型架构与设计
1. 模型概述
药店单品销售预测系统采用了多种先进的深度学习模型,包括Transformer、矩阵LSTM(mLSTM)和Kolmogorov-Arnold网络(KAN)。这些模型针对时间序列预测任务进行了专门优化,能够有效捕捉销售数据中的各种模式,如季节性、趋势性和突发性变化。
2. 数据预处理流程
2.1 特征工程
系统使用以下特征进行预测:
- 销售量: 目标变量,也作为历史特征输入
- 价格: 产品价格
- 星期几: 0-6表示周一至周日
- 月份: 1-12表示一年中的月份
- 节假日标志: 是否为节假日(0或1)
- 周末标志: 是否为周末(0或1)
- 促销标志: 是否有促销活动(0或1)
- 气温: 当日温度
2.2 数据标准化
所有特征使用MinMaxScaler进行标准化,将值映射到[0,1]区间:
scaler_X = MinMaxScaler(feature_range=(0, 1))
scaler_y = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler_X.fit_transform(X)
y_scaled = scaler_y.fit_transform(y)
2.3 时间序列数据集创建
系统将原始数据转换为滑动窗口格式的时间序列数据:
def create_dataset(X, y, look_back, future_days):
"""
创建时间序列数据集
参数:
X: 输入特征
y: 目标变量
look_back: 使用过去多少天的数据进行预测
future_days: 预测未来多少天
返回:
X_out: 形状为 (samples, look_back, features)
y_out: 形状为 (samples, future_days)
"""
X_out, y_out = [], []
for i in range(len(X) - look_back - future_days + 1):
X_out.append(X[i:(i + look_back)])
y_out.append(y[i + look_back:i + look_back + future_days, 0])
return np.array(X_out), np.array(y_out)
3. Transformer模型
3.1 架构设计
Transformer模型基于"Attention is All You Need"论文提出的架构,但针对时间序列预测进行了优化。
输出序列
↑
线性层+激活
↑
Transformer解码器
↑
Transformer编码器
↑
位置编码+嵌入
↑
输入序列
3.2 核心组件
3.2.1 位置编码
位置编码使模型能够感知序列中的位置信息:
def positional_encoding(seq_len, d_model):
positions = np.arange(seq_len)[:, np.newaxis]
angles = np.arange(d_model)[np.newaxis, :] / np.power(10000, 2 * (np.arange(d_model)[np.newaxis, :] // 2) / d_model)
# 应用sin函数到偶数位置
sines = np.sin(positions * angles[:, 0::2])
# 应用cos函数到奇数位置
cosines = np.cos(positions * angles[:, 1::2])
# 合并sin和cos
pos_encoding = np.zeros((seq_len, d_model))
pos_encoding[:, 0::2] = sines
pos_encoding[:, 1::2] = cosines
return torch.FloatTensor(pos_encoding)
3.2.2 多头自注意力机制
多头自注意力机制允许模型同时关注序列中的不同位置:
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.query = nn.Linear(d_model, d_model)
self.key = nn.Linear(d_model, d_model)
self.value = nn.Linear(d_model, d_model)
self.fc_out = nn.Linear(d_model, d_model)
def forward(self, query, key, value, mask=None):
batch_size = query.shape[0]
# 线性变换
Q = self.query(query)
K = self.key(key)
V = self.value(value)
# 分割成多头
Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
K = K.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
V = V.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
# 计算注意力得分
energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / (self.head_dim ** 0.5)
# 应用mask
if mask is not None:
energy = energy.masked_fill(mask == 0, -1e10)
# 应用softmax
attention = torch.softmax(energy, dim=-1)
# 应用注意力
out = torch.matmul(attention, V)
out = out.permute(0, 2, 1, 3).contiguous()
out = out.view(batch_size, -1, self.d_model)
out = self.fc_out(out)
return out
3.2.3 前馈网络
每个Transformer块中的前馈网络包含两个线性变换,中间有ReLU激活函数:
class PositionwiseFeedforward(nn.Module):
def __init__(self, d_model, d_ff):
super().__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
def forward(self, x):
return self.fc2(F.relu(self.fc1(x)))
3.3 完整的Transformer模型
class TimeSeriesTransformer(nn.Module):
def __init__(self, num_features, embed_dim, num_heads, dense_dim, dropout_rate, num_blocks, output_sequence_length):
super().__init__()
self.embedding = nn.Linear(num_features, embed_dim)
self.pos_encoding = positional_encoding(look_back, embed_dim)
self.encoder_layers = nn.ModuleList([
TransformerEncoderLayer(embed_dim, num_heads, dense_dim, dropout_rate)
for _ in range(num_blocks)
])
self.decoder_layers = nn.ModuleList([
TransformerDecoderLayer(embed_dim, num_heads, dense_dim, dropout_rate)
for _ in range(num_blocks)
])
self.output_layer = nn.Linear(embed_dim, 1)
self.output_sequence_length = output_sequence_length
def forward(self, x):
# x shape: [batch, seq_len, features]
batch_size = x.shape[0]
# 嵌入输入序列
x = self.embedding(x) # [batch, seq_len, embed_dim]
# 添加位置编码
x = x + self.pos_encoding.to(x.device)
# 编码器
for layer in self.encoder_layers:
x = layer(x)
# 解码器初始输入
decoder_input = x[:, -1:, :] # 使用编码器的最后一个时间步作为初始输入
outputs = []
# 自回归解码
for _ in range(self.output_sequence_length):
for layer in self.decoder_layers:
decoder_input = layer(decoder_input, x)
output = self.output_layer(decoder_input)
outputs.append(output)
# 更新解码器输入
decoder_input = torch.cat([decoder_input, output.unsqueeze(-1)], dim=1)
decoder_input = decoder_input[:, -1:, :]
# 合并所有预测结果
outputs = torch.cat(outputs, dim=1) # [batch, output_seq_len, 1]
return outputs.squeeze(-1) # [batch, output_seq_len]
4. 矩阵LSTM模型 (mLSTM)
4.1 标准LSTM限制
标准LSTM使用标量门控单元和隐藏状态,这限制了其表达能力,特别是在处理具有复杂依赖关系的数据时。
4.2 mLSTM创新点
矩阵LSTM (mLSTM) 使用矩阵代替标量来表示门控单元和隐藏状态,从而提高了模型的表达能力。主要创新点:
- 使用矩阵运算代替标量运算
- 隐藏状态表示为矩阵而非向量
- 使用矩阵乘法而非向量点积
4.3 mLSTM单元实现
class MLSTMCell(nn.Module):
def __init__(self, input_size, hidden_size, matrix_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.matrix_size = matrix_size
# 输入门参数
self.W_xi = nn.Parameter(torch.Tensor(input_size, hidden_size * matrix_size))
self.W_hi = nn.Parameter(torch.Tensor(hidden_size * matrix_size, hidden_size * matrix_size))
self.b_i = nn.Parameter(torch.Tensor(hidden_size * matrix_size))
# 遗忘门参数
self.W_xf = nn.Parameter(torch.Tensor(input_size, hidden_size * matrix_size))
self.W_hf = nn.Parameter(torch.Tensor(hidden_size * matrix_size, hidden_size * matrix_size))
self.b_f = nn.Parameter(torch.Tensor(hidden_size * matrix_size))
# 单元状态参数
self.W_xc = nn.Parameter(torch.Tensor(input_size, hidden_size * matrix_size))
self.W_hc = nn.Parameter(torch.Tensor(hidden_size * matrix_size, hidden_size * matrix_size))
self.b_c = nn.Parameter(torch.Tensor(hidden_size * matrix_size))
# 输出门参数
self.W_xo = nn.Parameter(torch.Tensor(input_size, hidden_size * matrix_size))
self.W_ho = nn.Parameter(torch.Tensor(hidden_size * matrix_size, hidden_size * matrix_size))
self.b_o = nn.Parameter(torch.Tensor(hidden_size * matrix_size))
self.init_weights()
def init_weights(self):
for p in self.parameters():
if p.data.ndimension() >= 2:
nn.init.xavier_uniform_(p.data)
else:
nn.init.zeros_(p.data)
def forward(self, x, hidden):
h, c = hidden
# 将隐藏状态矩阵展平为向量
h_flat = h.view(h.size(0), -1)
# 计算门控值
i = torch.sigmoid(x @ self.W_xi + h_flat @ self.W_hi + self.b_i)
f = torch.sigmoid(x @ self.W_xf + h_flat @ self.W_hf + self.b_f)
o = torch.sigmoid(x @ self.W_xo + h_flat @ self.W_ho + self.b_o)
# 计算候选单元状态
c_tilde = torch.tanh(x @ self.W_xc + h_flat @ self.W_hc + self.b_c)
# 更新单元状态
c_flat = f * c.view(c.size(0), -1) + i * c_tilde
c_new = c_flat.view(c.size(0), self.hidden_size, self.matrix_size)
# 计算新的隐藏状态
h_new = o * torch.tanh(c_flat)
h_new = h_new.view(h.size(0), self.hidden_size, self.matrix_size)
return h_new, c_new
4.4 mLSTM结合Transformer的混合模型
系统中的mLSTM模型与Transformer结合,形成强大的混合架构:
class MLSTMTransformer(nn.Module):
def __init__(self, num_features, hidden_size, mlstm_layers, embed_dim, dense_dim, num_heads, dropout_rate, num_blocks, output_sequence_length):
super().__init__()
self.matrix_size = 4 # 矩阵维度
self.mlstm = nn.ModuleList([
MLSTMLayer(num_features if i == 0 else hidden_size * self.matrix_size,
hidden_size,
self.matrix_size)
for i in range(mlstm_layers)
])
# Transformer部分
self.embedding = nn.Linear(hidden_size * self.matrix_size, embed_dim)
self.transformer_blocks = nn.ModuleList([
TransformerBlock(embed_dim, num_heads, dense_dim, dropout_rate)
for _ in range(num_blocks)
])
self.output_layer = nn.Linear(embed_dim, 1)
self.output_sequence_length = output_sequence_length
self.dropout = nn.Dropout(dropout_rate)
def forward(self, x):
batch_size, seq_len, _ = x.shape
# mLSTM处理
h_t = torch.zeros(batch_size, self.hidden_size, self.matrix_size).to(x.device)
c_t = torch.zeros(batch_size, self.hidden_size, self.matrix_size).to(x.device)
outputs = []
for t in range(seq_len):
x_t = x[:, t, :]
for layer in self.mlstm:
h_t, c_t = layer(x_t, (h_t, c_t))
x_t = h_t.view(batch_size, -1)
outputs.append(h_t.view(batch_size, 1, -1))
# 合并所有时间步的输出
mlstm_output = torch.cat(outputs, dim=1)
# Transformer处理
transformer_input = self.embedding(mlstm_output.view(batch_size, seq_len, -1))
transformer_input = self.dropout(transformer_input)
for block in self.transformer_blocks:
transformer_input = block(transformer_input)
# 输出层
decoder_input = transformer_input[:, -1:, :]
predictions = []
# 自回归生成预测序列
for _ in range(self.output_sequence_length):
for block in self.transformer_blocks:
decoder_input = block(decoder_input)
pred = self.output_layer(decoder_input)
predictions.append(pred)
# 更新解码器输入
embedding = self.embedding(pred.repeat(1, 1, self.hidden_size * self.matrix_size))
decoder_input = embedding
# 合并所有预测
predictions = torch.cat(predictions, dim=1)
return predictions.squeeze(-1)
5. Kolmogorov-Arnold网络 (KAN)
5.1 理论基础
KAN基于柯尔莫哥洛夫-阿诺尔德表示定理,该定理表明任何连续多变量函数都可以表示为单变量函数的有限复合。
5.2 KAN架构
KAN使用B样条基函数作为激活函数,通过学习这些基函数的组合来近似任意复杂的函数:
class KANLayer(nn.Module):
def __init__(self, in_features, out_features, grid_size=10, spline_order=3):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.grid_size = grid_size
self.spline_order = spline_order
# 网格点
self.grid = nn.Parameter(torch.linspace(-1, 1, grid_size))
# 控制点权重
self.weights = nn.Parameter(torch.Tensor(in_features, out_features, grid_size))
# 初始化
nn.init.xavier_uniform_(self.weights)
def b_spline(self, x, idx, order):
"""计算B样条基函数"""
if order == 0:
return ((x >= self.grid[idx]) & (x < self.grid[idx+1])).float()
# 递归计算
w1 = (x - self.grid[idx]) / (self.grid[idx+order] - self.grid[idx] + 1e-7)
w2 = (self.grid[idx+order+1] - x) / (self.grid[idx+order+1] - self.grid[idx+1] + 1e-7)
return w1 * self.b_spline(x, idx, order-1) + w2 * self.b_spline(x, idx+1, order-1)
def forward(self, x):
batch_size = x.shape[0]
out = torch.zeros(batch_size, self.out_features).to(x.device)
# 对每个输入特征计算B样条
for i in range(self.in_features):
for j in range(self.grid_size - self.spline_order - 1):
# 计算样条基函数
basis = self.b_spline(x[:, i].unsqueeze(1), j, self.spline_order)
# 应用权重
out += basis * self.weights[i, :, j].unsqueeze(0)
return out
5.3 KAN预测模型
class KANForecaster(nn.Module):
def __init__(self, num_features, hidden_sizes=[64, 32], grid_size=10, spline_order=3, output_sequence_length=7):
super().__init__()
self.output_sequence_length = output_sequence_length
# 输入层
self.input_layer = nn.Linear(num_features * look_back, hidden_sizes[0])
# KAN层
self.kan_layers = nn.ModuleList()
for i in range(len(hidden_sizes) - 1):
self.kan_layers.append(KANLayer(hidden_sizes[i], hidden_sizes[i+1], grid_size, spline_order))
# 输出层
self.output_layer = nn.Linear(hidden_sizes[-1], output_sequence_length)
def forward(self, x):
batch_size = x.shape[0]
# 展平输入序列
x = x.view(batch_size, -1)
# 输入层
x = F.relu(self.input_layer(x))
# KAN层
for layer in self.kan_layers:
x = layer(x)
# 输出层
output = self.output_layer(x)
return output
6. 模型比较
6.1 各模型优势
模型 | 优势 | 适用场景 |
---|---|---|
Transformer | 并行计算能力强,可以捕捉长期依赖关系 | 数据量大,有明显的季节性模式 |
mLSTM | 增强的记忆能力,能处理复杂序列关系 | 数据有复杂的时序依赖,存在不规则波动 |
KAN | 可解释性强,能自适应学习复杂非线性关系 | 数据具有复杂的非线性特征,样本量相对较小 |
6.2 性能对比
根据实验结果,不同模型在不同类型的药品销售预测中表现各异:
- 季节性强的药品:Transformer通常表现最佳
- 突发性销售的药品:mLSTM能更好地捕捉突变点
- 非线性关系明显的药品:KAN往往有更高的预测精度
7. 模型评估指标
系统使用多种指标评估模型性能:
- MSE (均方误差): 预测值与实际值差的平方的平均值
- RMSE (均方根误差): MSE的平方根,与原始数据单位相同
- MAE (平均绝对误差): 预测值与实际值绝对差的平均值
- R² (决定系数): 模型解释的方差比例,范围通常为0-1
- MAPE (平均绝对百分比误差): 预测值与实际值绝对差的百分比平均值
def evaluate_model(y_true, y_pred):
"""
评估模型性能
参数:
y_true: 真实值
y_pred: 预测值
返回:
包含各评估指标的字典
"""
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
# 计算MAPE,避免除零错误
mask = y_true != 0
mape = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
return {
'mse': mse,
'rmse': rmse,
'mae': mae,
'r2': r2,
'mape': mape
}
8. 模型持久化
系统使用PyTorch的序列化机制保存和加载模型:
def save_model(model, product_id, model_type, metrics):
"""保存模型和相关信息"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
version = timestamp
# 创建保存目录
save_dir = f"predictions/{model_type}/{product_id}"
os.makedirs(save_dir, exist_ok=True)
# 保存模型
model_path = f"{save_dir}/{version}.pt"
torch.save({
'model_state_dict': model.state_dict(),
'metrics': metrics,
'created_at': timestamp,
'product_id': product_id,
'model_type': model_type,
'version': version
}, model_path)
return model_path
def load_model(product_id, model_type, version='latest'):
"""加载模型"""
model_dir = f"predictions/{model_type}/{product_id}"
if not os.path.exists(model_dir):
return None
if version == 'latest':
# 获取最新版本
model_files = glob.glob(f"{model_dir}/*.pt")
if not model_files:
return None
model_path = max(model_files, key=os.path.getctime)
else:
model_path = f"{model_dir}/{version}.pt"
if not os.path.exists(model_path):
return None
# 加载模型
checkpoint = torch.load(model_path)
# 根据模型类型创建相应的模型实例
if model_type == 'transformer':
model = TimeSeriesTransformer(...)
elif model_type == 'mlstm':
model = MLSTMTransformer(...)
elif model_type == 'kan':
model = KANForecaster(...)
else:
return None
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
return model, checkpoint
9. 未来模型优化方向
9.1 模型改进
- 注意力机制优化: 引入多尺度注意力,更好地捕捉不同时间粒度的模式
- 集成学习: 结合多个模型的预测结果,提高整体预测精度
- 贝叶斯优化: 自动调整超参数,找到最优模型配置
9.2 特征工程增强
- 时间分解: 将时间序列分解为趋势、季节性和残差组件
- 外部特征整合: 引入更多外部因素,如疫情指数、网络搜索热度等
- 特征自动选择: 使用特征重要性评估,自动选择最相关的特征
9.3 可解释性增强
- 注意力可视化: 展示模型在预测时关注的历史数据点
- 局部可解释性: 使用SHAP或LIME等技术解释单个预测
- 规则提取: 从训练好的模型中提取简单的决策规则