PyTorch:使用 LSTM 预测未来值
PyTorch: Predicting future values with LSTM
我目前正在使用 PyTorch 构建 LSTM 模型来预测时间序列数据。我使用滞后特征将前面的 n 个步骤作为输入来训练网络。我将数据分成三组,即训练-验证-测试拆分,并使用前两组来训练模型。我的验证函数从验证数据集中获取数据,并通过使用 DataLoaders 和 TensorDataset classes 将其传递给 LSTM 模型来计算预测值。最初,我在 0.85-0.95 范围内的 R2 值得到了很好的结果。
但是,我对这个验证函数是否也适用于测试我的模型的性能有一种不安的感觉。因为函数现在使用来自 DataLoader 的实际 X 值,即时滞特征来预测 y^ 值,即预测目标值,而不是使用预测的 y^ 值作为下一个预测中的特征。这种情况似乎与现实相去甚远,因为模型不知道先前时间步长的实际值,特别是如果您预测较长时间段的时间序列数据,比如 3-6 个月。
我目前对解决这个问题并定义一个函数来根据模型的值而不是测试集中的实际值来预测未来值感到有些困惑。我有以下函数predict
,它进行一步预测,但我还没有真正弄清楚如何使用 DataLoader 预测整个测试数据集。
def predict(self, x):
# convert row to data
x = x.to(device)
# make prediction
yhat = self.model(x)
# retrieve numpy array
yhat = yhat.to(device).detach().numpy()
return yhat
您可以在下面找到我如何拆分和加载我的数据集、我的 LSTM 模型构造函数和验证函数。如果您需要更多信息,请随时与我联系。
拆分和加载数据集
def create_tensor_datasets(X_train_arr, X_val_arr, X_test_arr, y_train_arr, y_val_arr, y_test_arr):
train_features = torch.Tensor(X_train_arr)
train_targets = torch.Tensor(y_train_arr)
val_features = torch.Tensor(X_val_arr)
val_targets = torch.Tensor(y_val_arr)
test_features = torch.Tensor(X_test_arr)
test_targets = torch.Tensor(y_test_arr)
train = TensorDataset(train_features, train_targets)
val = TensorDataset(val_features, val_targets)
test = TensorDataset(test_features, test_targets)
return train, val, test
def load_tensor_datasets(train, val, test, batch_size=64, shuffle=False, drop_last=True):
train_loader = DataLoader(train, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
val_loader = DataLoader(val, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
test_loader = DataLoader(test, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
return train_loader, val_loader, test_loader
Class LSTM
class LSTMModel(nn.Module):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob):
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.lstm = nn.LSTM(
input_dim, hidden_dim, layer_dim, batch_first=True, dropout=dropout_prob
)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x, future=False):
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_()
c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_()
out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
out = out[:, -1, :]
out = self.fc(out)
return out
验证(在训练器中定义class)
def validation(self, val_loader, batch_size, n_features):
with torch.no_grad():
predictions = []
values = []
for x_val, y_val in val_loader:
x_val = x_val.view([batch_size, -1, n_features]).to(device)
y_val = y_val.to(device)
self.model.eval()
yhat = self.model(x_val)
predictions.append(yhat.cpu().detach().numpy())
values.append(y_val.cpu().detach().numpy())
return predictions, values
我终于找到了一种根据早期观察的预测值来预测值的方法。不出所料,预测在短期内相当准确,但在长期内会略有恶化。随着时间的推移,未来的预测会出现偏差,这并不奇怪,因为它们不再依赖于实际值。回顾我的结果和我就该主题进行的讨论,以下是我的收获:
在现实生活中,可以在预测的每个步骤(每周、每天或每小时)检索真实值并将其输入模型,以便下一步可以用上一步的实际值预测。因此,根据测试集的实际值来测试性能可能会在一定程度上反映定期维护的模型的真实性能。
然而,为了预测长期的未来价值,如果你愿意的话,你需要做出跨越你希望的时间段的多个一步预测或多步预测预测。
根据模型预测的值进行多次一步预测会在短期内产生合理的结果。随着预测周期的增加,预测变得越来越不准确,因此越来越不符合预测的目的。
要进行多个一步预测并在每次预测后更新输入,我们必须一个接一个地处理数据集,就好像我们正在对数据集进行 for 循环一样测试集。毫不奇怪,这使我们失去了矩阵运算和小批量训练为我们提供的所有计算优势。
另一种方法是预测值序列,而不是仅预测下一个值,例如使用具有多对多或序列到序列结构的多维输出的 RNN。它们可能更难训练,并且在针对不同时间段做出预测时更不灵活。编码器-解码器结构可能对解决这个问题很有用,尽管我自己还没有实现它。
您可以找到我的函数的代码,它根据数据集的最后一行 X
(时滞特征)和 y
(目标值)预测下一个 n_steps
).要遍历数据集中的每一行,我会将 batch_size
设置为 1,将 n_features
设置为滞后观测值的数量。
def forecast(self, X, y, batch_size=1, n_features=1, n_steps=100):
predictions = []
X = torch.roll(X, shifts=1, dims=2)
X[..., -1, 0] = y.item(0)
with torch.no_grad():
self.model.eval()
for _ in range(n_steps):
X = X.view([batch_size, -1, n_features]).to(device)
yhat = self.model(X)
yhat = yhat.to(device).detach().numpy()
X = torch.roll(X, shifts=1, dims=2)
X[..., -1, 0] = yhat.item(0)
predictions.append(yhat)
return predictions
以下行将张量的第二维值移动一个,以便张量 [[[x1, x2, x3, ... , xn ]]]
变为 [[[xn, x1, x2, ... , x(n-1)]]]
。
X = torch.roll(X, shifts=1, dims=2)
并且,下面的行从 3d 张量的最后一个维度中选择第一个元素,并将该项目设置为存储在 NumPy ndarray (yhat) 中的预测值,[[xn+1]]
。那么,新的输入张量就变成了[[[x(n+1), x1, x2, ... , x(n-1)]]]
X[..., -1, 0] = yhat.item(0)
最近,我决定把我学到的东西和我早先想知道的东西放在一起。如果您想看一看,可以在下面找到链接。我希望你会发现它有用。如果您同意或不同意我上面的任何评论,请随时发表评论或与我联系。
我目前正在使用 PyTorch 构建 LSTM 模型来预测时间序列数据。我使用滞后特征将前面的 n 个步骤作为输入来训练网络。我将数据分成三组,即训练-验证-测试拆分,并使用前两组来训练模型。我的验证函数从验证数据集中获取数据,并通过使用 DataLoaders 和 TensorDataset classes 将其传递给 LSTM 模型来计算预测值。最初,我在 0.85-0.95 范围内的 R2 值得到了很好的结果。
但是,我对这个验证函数是否也适用于测试我的模型的性能有一种不安的感觉。因为函数现在使用来自 DataLoader 的实际 X 值,即时滞特征来预测 y^ 值,即预测目标值,而不是使用预测的 y^ 值作为下一个预测中的特征。这种情况似乎与现实相去甚远,因为模型不知道先前时间步长的实际值,特别是如果您预测较长时间段的时间序列数据,比如 3-6 个月。
我目前对解决这个问题并定义一个函数来根据模型的值而不是测试集中的实际值来预测未来值感到有些困惑。我有以下函数predict
,它进行一步预测,但我还没有真正弄清楚如何使用 DataLoader 预测整个测试数据集。
def predict(self, x):
# convert row to data
x = x.to(device)
# make prediction
yhat = self.model(x)
# retrieve numpy array
yhat = yhat.to(device).detach().numpy()
return yhat
您可以在下面找到我如何拆分和加载我的数据集、我的 LSTM 模型构造函数和验证函数。如果您需要更多信息,请随时与我联系。
拆分和加载数据集
def create_tensor_datasets(X_train_arr, X_val_arr, X_test_arr, y_train_arr, y_val_arr, y_test_arr):
train_features = torch.Tensor(X_train_arr)
train_targets = torch.Tensor(y_train_arr)
val_features = torch.Tensor(X_val_arr)
val_targets = torch.Tensor(y_val_arr)
test_features = torch.Tensor(X_test_arr)
test_targets = torch.Tensor(y_test_arr)
train = TensorDataset(train_features, train_targets)
val = TensorDataset(val_features, val_targets)
test = TensorDataset(test_features, test_targets)
return train, val, test
def load_tensor_datasets(train, val, test, batch_size=64, shuffle=False, drop_last=True):
train_loader = DataLoader(train, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
val_loader = DataLoader(val, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
test_loader = DataLoader(test, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
return train_loader, val_loader, test_loader
Class LSTM
class LSTMModel(nn.Module):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob):
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.lstm = nn.LSTM(
input_dim, hidden_dim, layer_dim, batch_first=True, dropout=dropout_prob
)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x, future=False):
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_()
c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_()
out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
out = out[:, -1, :]
out = self.fc(out)
return out
验证(在训练器中定义class)
def validation(self, val_loader, batch_size, n_features):
with torch.no_grad():
predictions = []
values = []
for x_val, y_val in val_loader:
x_val = x_val.view([batch_size, -1, n_features]).to(device)
y_val = y_val.to(device)
self.model.eval()
yhat = self.model(x_val)
predictions.append(yhat.cpu().detach().numpy())
values.append(y_val.cpu().detach().numpy())
return predictions, values
我终于找到了一种根据早期观察的预测值来预测值的方法。不出所料,预测在短期内相当准确,但在长期内会略有恶化。随着时间的推移,未来的预测会出现偏差,这并不奇怪,因为它们不再依赖于实际值。回顾我的结果和我就该主题进行的讨论,以下是我的收获:
在现实生活中,可以在预测的每个步骤(每周、每天或每小时)检索真实值并将其输入模型,以便下一步可以用上一步的实际值预测。因此,根据测试集的实际值来测试性能可能会在一定程度上反映定期维护的模型的真实性能。
然而,为了预测长期的未来价值,如果你愿意的话,你需要做出跨越你希望的时间段的多个一步预测或多步预测预测。
根据模型预测的值进行多次一步预测会在短期内产生合理的结果。随着预测周期的增加,预测变得越来越不准确,因此越来越不符合预测的目的。
要进行多个一步预测并在每次预测后更新输入,我们必须一个接一个地处理数据集,就好像我们正在对数据集进行 for 循环一样测试集。毫不奇怪,这使我们失去了矩阵运算和小批量训练为我们提供的所有计算优势。
另一种方法是预测值序列,而不是仅预测下一个值,例如使用具有多对多或序列到序列结构的多维输出的 RNN。它们可能更难训练,并且在针对不同时间段做出预测时更不灵活。编码器-解码器结构可能对解决这个问题很有用,尽管我自己还没有实现它。
您可以找到我的函数的代码,它根据数据集的最后一行 X
(时滞特征)和 y
(目标值)预测下一个 n_steps
).要遍历数据集中的每一行,我会将 batch_size
设置为 1,将 n_features
设置为滞后观测值的数量。
def forecast(self, X, y, batch_size=1, n_features=1, n_steps=100):
predictions = []
X = torch.roll(X, shifts=1, dims=2)
X[..., -1, 0] = y.item(0)
with torch.no_grad():
self.model.eval()
for _ in range(n_steps):
X = X.view([batch_size, -1, n_features]).to(device)
yhat = self.model(X)
yhat = yhat.to(device).detach().numpy()
X = torch.roll(X, shifts=1, dims=2)
X[..., -1, 0] = yhat.item(0)
predictions.append(yhat)
return predictions
以下行将张量的第二维值移动一个,以便张量 [[[x1, x2, x3, ... , xn ]]]
变为 [[[xn, x1, x2, ... , x(n-1)]]]
。
X = torch.roll(X, shifts=1, dims=2)
并且,下面的行从 3d 张量的最后一个维度中选择第一个元素,并将该项目设置为存储在 NumPy ndarray (yhat) 中的预测值,[[xn+1]]
。那么,新的输入张量就变成了[[[x(n+1), x1, x2, ... , x(n-1)]]]
X[..., -1, 0] = yhat.item(0)
最近,我决定把我学到的东西和我早先想知道的东西放在一起。如果您想看一看,可以在下面找到链接。我希望你会发现它有用。如果您同意或不同意我上面的任何评论,请随时发表评论或与我联系。