如何自定义 LSTM 损失函数以仅考虑给定索引范围的预测和目标序列?

How to customize an LSTM loss function to only concider a given index range of prediction and target sequence?

我目前正在使用 LSTM 序列到序列模型进行时域信号预测。由于领域知识,我知道预测的第一部分(大约 20%)永远无法正确预测,因为所需的信息在给定的输入序列中不可用。其余 80% 的预测序列通常预测得很好。为了从训练优化中排除前 20%,最好定义一个基本上在给定索引范围内运行的损失函数,如下面的 numpy 代码:

start = int(0.2*sequence_length)
stop = sequence_length

def mse(pred, target):
    """ Mean squared error between two time series np.arrays """
    return 1/target.shape[0]*np.sum((pred-target)**2)

def range_mse_loss(y_pred, y):
    return mse(y_pred[start:stop],y[start:stop])

如何编写此损失函数才能使其与我先前存在的 keras 代码一起使用,其中损失仅由 model.compile(loss='mse') 给出?

您可以将张量切分到最后 80% 的数据。

size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)

loss_fn = tf.keras.losses.MeanSquaredError(name='mse')

loss_fn(y_pred[:-size], y_true[:-size])

您还可以在 tf.keras.losses.MeanSquaredError() 处使用 sample_weights,传递一个权重数组,权重的前 20% 为零

size = int(y_true.shape[0] * 0.8) # for 2D vector, e.g., (100, 1)

zeros = tf.zeros((y_true.shape[0] - size), dtype=tf.int32)
ones = tf.ones((size), dtype=tf.int32)

weights = tf.concat([zeros, ones], 0)

loss_fn = tf.keras.losses.MeanSquaredError(name='mse')

loss_fn(y_pred, y_true, sample_weights=weights)

第二个解决方案变暖了,最终损失将低于第一个解决方案,因为您在第一个预测值中输入了零,但没有在公式 MSE = 1 / 中删除它们n * sum((y-y_hat)^2).

一种解决方法是将观察结果标记为 None/nan,然后覆盖 train_step 方法。按照 tensorflow 的 tutorial 关于自定义 train_step,你会做这样的事情

@tf.function
def train_step(keras_model, data):
    print('custom train_step')
    # Unpack the data. Its structure depends on your model and
    # on what you pass to `fit()`.
    x, y = data

    with tf.GradientTape() as tape:
        y_pred = keras_model(x, training=True)  # Forward pass

        # masking nan values in observations, also assuming that targets are >0.0
        mask = tf.greater(y, 0.0)
        true_y = tf.boolean_mask(y, mask)
        pred_y = tf.boolean_mask(y_pred, mask)

        # Compute the loss value
        # (the loss function is configured in `compile()`)
        loss = keras_model.compiled_loss(true_y, pred_y, regularization_losses=keras_model.losses)

    # Compute gradients
    trainable_vars = keras_model.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)
    # Update weights
    keras_model.optimizer.apply_gradients(zip(gradients, trainable_vars))
    # Update metrics (includes the metric that tracks the loss)
    keras_model.compiled_metrics.update_state(true_y, pred_y)
    # Return a dict mapping metric names to current value
    return {m.name: m.result() for m in keras_model.metrics}

这将适用于您正在跟踪的所有绩效指标。另一种方法是在损失函数内屏蔽 nans,但这仅限于一个损失函数,而不是任何其他损失 function/performance 指标。