如何自定义 LSTM 损失函数以仅考虑给定索引范围的预测和目标序列?
How to customize an LSTM loss function to only concider a given index range of prediction and target sequence?
我目前正在使用 LSTM 序列到序列模型进行时域信号预测。由于领域知识,我知道预测的第一部分(大约 20%)永远无法正确预测,因为所需的信息在给定的输入序列中不可用。其余 80% 的预测序列通常预测得很好。为了从训练优化中排除前 20%,最好定义一个基本上在给定索引范围内运行的损失函数,如下面的 numpy 代码:
start = int(0.2*sequence_length)
stop = sequence_length
def mse(pred, target):
""" Mean squared error between two time series np.arrays """
return 1/target.shape[0]*np.sum((pred-target)**2)
def range_mse_loss(y_pred, y):
return mse(y_pred[start:stop],y[start:stop])
如何编写此损失函数才能使其与我先前存在的 keras 代码一起使用,其中损失仅由 model.compile(loss='mse')
给出?
您可以将张量切分到最后 80% 的数据。
size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)
loss_fn = tf.keras.losses.MeanSquaredError(name='mse')
loss_fn(y_pred[:-size], y_true[:-size])
您还可以在 tf.keras.losses.MeanSquaredError() 处使用 sample_weights,传递一个权重数组,权重的前 20% 为零
size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)
zeros = tf.zeros((y_true.shape[0] - size), dtype=tf.int32)
ones = tf.ones((size), dtype=tf.int32)
weights = tf.concat([zeros, ones], 0)
loss_fn = tf.keras.losses.MeanSquaredError(name='mse')
loss_fn(y_pred, y_true, sample_weights=weights)
第二个解决方案变暖了,最终损失将低于第一个解决方案,因为您在第一个预测值中输入了零,但没有在公式 MSE = 1 / 中删除它们n * sum((y-y_hat)^2).
一种解决方法是将观察结果标记为 None/nan,然后覆盖 train_step
方法。按照 tensorflow 的 tutorial 关于自定义 train_step
,你会做这样的事情
@tf.function
def train_step(keras_model, data):
print('custom train_step')
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = keras_model(x, training=True) # Forward pass
# masking nan values in observations, also assuming that targets are >0.0
mask = tf.greater(y, 0.0)
true_y = tf.boolean_mask(y, mask)
pred_y = tf.boolean_mask(y_pred, mask)
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = keras_model.compiled_loss(true_y, pred_y, regularization_losses=keras_model.losses)
# Compute gradients
trainable_vars = keras_model.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
keras_model.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
keras_model.compiled_metrics.update_state(true_y, pred_y)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in keras_model.metrics}
这将适用于您正在跟踪的所有绩效指标。另一种方法是在损失函数内屏蔽 nans,但这仅限于一个损失函数,而不是任何其他损失 function/performance 指标。
我目前正在使用 LSTM 序列到序列模型进行时域信号预测。由于领域知识,我知道预测的第一部分(大约 20%)永远无法正确预测,因为所需的信息在给定的输入序列中不可用。其余 80% 的预测序列通常预测得很好。为了从训练优化中排除前 20%,最好定义一个基本上在给定索引范围内运行的损失函数,如下面的 numpy 代码:
start = int(0.2*sequence_length)
stop = sequence_length
def mse(pred, target):
""" Mean squared error between two time series np.arrays """
return 1/target.shape[0]*np.sum((pred-target)**2)
def range_mse_loss(y_pred, y):
return mse(y_pred[start:stop],y[start:stop])
如何编写此损失函数才能使其与我先前存在的 keras 代码一起使用,其中损失仅由 model.compile(loss='mse')
给出?
您可以将张量切分到最后 80% 的数据。
size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)
loss_fn = tf.keras.losses.MeanSquaredError(name='mse')
loss_fn(y_pred[:-size], y_true[:-size])
您还可以在 tf.keras.losses.MeanSquaredError() 处使用 sample_weights,传递一个权重数组,权重的前 20% 为零
size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)
zeros = tf.zeros((y_true.shape[0] - size), dtype=tf.int32)
ones = tf.ones((size), dtype=tf.int32)
weights = tf.concat([zeros, ones], 0)
loss_fn = tf.keras.losses.MeanSquaredError(name='mse')
loss_fn(y_pred, y_true, sample_weights=weights)
第二个解决方案变暖了,最终损失将低于第一个解决方案,因为您在第一个预测值中输入了零,但没有在公式 MSE = 1 / 中删除它们n * sum((y-y_hat)^2).
一种解决方法是将观察结果标记为 None/nan,然后覆盖 train_step
方法。按照 tensorflow 的 tutorial 关于自定义 train_step
,你会做这样的事情
@tf.function
def train_step(keras_model, data):
print('custom train_step')
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = keras_model(x, training=True) # Forward pass
# masking nan values in observations, also assuming that targets are >0.0
mask = tf.greater(y, 0.0)
true_y = tf.boolean_mask(y, mask)
pred_y = tf.boolean_mask(y_pred, mask)
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = keras_model.compiled_loss(true_y, pred_y, regularization_losses=keras_model.losses)
# Compute gradients
trainable_vars = keras_model.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
keras_model.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
keras_model.compiled_metrics.update_state(true_y, pred_y)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in keras_model.metrics}
这将适用于您正在跟踪的所有绩效指标。另一种方法是在损失函数内屏蔽 nans,但这仅限于一个损失函数,而不是任何其他损失 function/performance 指标。