107 lines
4.6 KiB
Python
107 lines
4.6 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
|
|
# 定义Transformer编码器
|
|
class TransformerEncoder(nn.Module):
|
|
def __init__(self, embed_dim, dense_dim, num_heads, dropout_rate):
|
|
super().__init__()
|
|
self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout_rate, batch_first=True)
|
|
self.dense1 = nn.Linear(embed_dim, dense_dim)
|
|
self.dense2 = nn.Linear(dense_dim, embed_dim)
|
|
self.layernorm1 = nn.LayerNorm(embed_dim)
|
|
self.layernorm2 = nn.LayerNorm(embed_dim)
|
|
self.dropout1 = nn.Dropout(dropout_rate)
|
|
self.dropout2 = nn.Dropout(dropout_rate)
|
|
self.activation = nn.ReLU()
|
|
|
|
def forward(self, inputs):
|
|
attn_output, _ = self.attention(inputs, inputs, inputs)
|
|
attn_output = self.dropout1(attn_output)
|
|
out1 = self.layernorm1(inputs + attn_output)
|
|
|
|
dense_output = self.dense1(out1)
|
|
dense_output = self.activation(dense_output)
|
|
dense_output = self.dense2(dense_output)
|
|
dense_output = self.dropout2(dense_output)
|
|
|
|
return self.layernorm2(out1 + dense_output)
|
|
|
|
# 定义Transformer解码器
|
|
class TransformerDecoder(nn.Module):
|
|
def __init__(self, embed_dim, dense_dim, num_heads, dropout_rate):
|
|
super().__init__()
|
|
self.attention1 = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout_rate, batch_first=True)
|
|
self.attention2 = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout_rate, batch_first=True)
|
|
self.dense1 = nn.Linear(embed_dim, dense_dim)
|
|
self.dense2 = nn.Linear(dense_dim, embed_dim)
|
|
self.layernorm1 = nn.LayerNorm(embed_dim)
|
|
self.layernorm2 = nn.LayerNorm(embed_dim)
|
|
self.layernorm3 = nn.LayerNorm(embed_dim)
|
|
self.dropout1 = nn.Dropout(dropout_rate)
|
|
self.dropout2 = nn.Dropout(dropout_rate)
|
|
self.dropout3 = nn.Dropout(dropout_rate)
|
|
self.activation = nn.ReLU()
|
|
|
|
def forward(self, inputs, encoder_outputs):
|
|
attn_output1, _ = self.attention1(inputs, inputs, inputs)
|
|
attn_output1 = self.dropout1(attn_output1)
|
|
out1 = self.layernorm1(inputs + attn_output1)
|
|
|
|
attn_output2, _ = self.attention2(out1, encoder_outputs, encoder_outputs)
|
|
attn_output2 = self.dropout2(attn_output2)
|
|
out2 = self.layernorm2(out1 + attn_output2)
|
|
|
|
dense_output = self.dense1(out2)
|
|
dense_output = self.activation(dense_output)
|
|
dense_output = self.dense2(dense_output)
|
|
dense_output = self.dropout3(dense_output)
|
|
|
|
return self.layernorm3(out2 + dense_output)
|
|
|
|
# 定义完整的Transformer模型
|
|
class TimeSeriesTransformer(nn.Module):
|
|
# 定义模型参数
|
|
input_size = 64
|
|
hidden_size = 128
|
|
num_layers = 1
|
|
|
|
def __init__(self, num_features, d_model=32, nhead=4, num_encoder_layers=3, dim_feedforward=32, dropout=0.1, output_sequence_length=1, seq_length=14, batch_size=32):
|
|
super().__init__()
|
|
# 映射参数
|
|
embed_dim = d_model
|
|
dense_dim = dim_feedforward
|
|
num_heads = nhead
|
|
dropout_rate = dropout
|
|
num_blocks = num_encoder_layers
|
|
|
|
self.input_embedding = nn.Linear(num_features, embed_dim)
|
|
self.positional_encoding = nn.Parameter(torch.randn(1, 1000, embed_dim) * 0.1) # 足够长的位置编码
|
|
|
|
self.encoders = nn.ModuleList([TransformerEncoder(embed_dim, dense_dim, num_heads, dropout_rate) for _ in range(num_blocks)])
|
|
self.decoders = nn.ModuleList([TransformerDecoder(embed_dim, dense_dim, num_heads, dropout_rate) for _ in range(num_blocks)])
|
|
|
|
self.output_layer = nn.Linear(embed_dim, 1) # 输出单一销售量预测
|
|
self.dropout = nn.Dropout(dropout_rate)
|
|
self.output_sequence_length = output_sequence_length
|
|
self.seq_length = seq_length
|
|
self.batch_size = batch_size
|
|
|
|
def forward(self, inputs):
|
|
# inputs shape: [batch_size, sequence_length, num_features]
|
|
x = self.input_embedding(inputs)
|
|
x = x + self.positional_encoding[:, :x.size(1), :]
|
|
x = self.dropout(x)
|
|
|
|
# 编码器部分
|
|
encoder_outputs = x
|
|
for encoder in self.encoders:
|
|
encoder_outputs = encoder(encoder_outputs)
|
|
|
|
# 解码器部分 - 使用最后一个时间步作为decoder输入的起点
|
|
decoder_inputs = encoder_outputs[:, -1:, :].expand(-1, self.output_sequence_length, -1)
|
|
decoder_outputs = decoder_inputs
|
|
|
|
for decoder in self.decoders:
|
|
decoder_outputs = decoder(decoder_outputs, encoder_outputs)
|
|
|
|
return self.output_layer(decoder_outputs) # [batch_size, output_sequence_length, 1] |