在 Tensorflow 中创建加权 MSE 损失函数

Create a weighted MSE loss function in Tensorflow

我想使用 Tensorflow 训练循环神经网络。我的模型为每个训练样本输出一个 1 x 100 的向量。假设 y = [y_1, y_2, ..., y_100] 是我训练样本 x 的输出,预期输出是 y'= [y'_1, y'_2, ..., y'_100].

我想编写一个自定义损失函数来计算这个特定样本的损失,如下所示:

Loss =  1/sum(weights) * sqrt(w_1*(y_1-y'_1)^2 + ... + w_100*(y_100-y'_100)^2)

其中weights = [w_1,...,w_100]是给定的权重数组。

有人可以帮我实现这样一个自定义损失函数吗? (我在训练时也使用了 mini-batches)

您可以通过以下方式实现自定义加权mse

import numpy as np 
from tensorflow.keras import backend as K 

def custom_mse(class_weights):
    def weighted_mse(gt, pred):
        # Formula: 
        # w_1*(y_1-y'_1)^2 + ... + w_100*(y_100-y'_100)^2 / sum(weights)
        return K.sum(class_weights * K.square(gt - pred)) / K.sum(class_weights)
    return weighted_mse

y_true  = np.array([[0., 1., 1, 0.], [0., 0., 1., 1.]])
y_pred  = np.array([[0., 1, 0., 1.], [1., 0., 1., 1.]])
weights = np.array([0.25, 0.50, 1., 0.75])

print(y_true.shape, y_pred.shape, weights.shape)
(2, 4) (2, 4) (4,)
loss = custom_mse(class_weights=weights)
loss(y_true, y_pred).numpy()
0.8

将其用于模型编译。

model.compile(loss=custom_mse(weights))

这将使用提供的 weighted 矩阵计算 mse。但是,在您的问题中,您引用了 sqrt...,我认为您的意思是 root mse (rmse)。为此,您可以在 custom_mse.

的自定义函数中使用 K.sqrt(K.sum(...)) / K.sum(...)

仅供参考,您可能也有兴趣在 Model. fit 期间查看 class_weightssample_weights。来自 source:

  • class_weight: Optional dictionary mapping class indices (integers) to a weight (float) value, used for weighting the loss function (during training only). This can be useful to tell the model to "pay more attention" to samples from an under-represented class.

  • sample_weight: Optional Numpy array of weights for the training samples, used for weighting the loss function (during training only). You can either pass a flat (1D) Numpy array with the same length as the input samples (1:1 mapping between weights and samples), or in the case of temporal data, you can pass a 2D array with shape (samples, sequence_length), to apply a different weight to every timestep of every sample. This argument is not supported when x is a dataset, generator, or keras.utils.Sequence instance, instead provides the sample_weights as the third element of x.

还有 loss_weightsModel.compile,来自 source

loss_weights: Optional list or dictionary specifying scalar coefficients (Python floats) to weight the loss contributions of different model outputs. The loss value that will be minimized by the model will then be the weighted sum of all individual losses, weighted by the loss_weights coefficients. If a list, it is expected to have a 1:1 mapping to the model's outputs. If a dict, it is expected to map output names (strings) to scalar coefficients.

我想强调一下,根据您的问题,您有两种可能性:

[1] 如果所有样本的权重都相等:

您可以构建一个损失包装器。这是一个虚拟示例:

n_sample = 200
X = np.random.uniform(0,1, (n_sample,10))
y = np.random.uniform(0,1, (n_sample,100))
W = np.random.uniform(0,1, (100,)).astype('float32')

def custom_loss_wrapper(weights):
    def loss(true, pred):
        sum_weights = tf.reduce_sum(weights) * tf.cast(tf.shape(pred)[0], tf.float32)
        resid = tf.sqrt(tf.reduce_sum(weights * tf.square(true - pred)))
        return resid/sum_weights
    return loss

inp = Input((10,))
x = Dense(256)(inp)
pred = Dense(100)(x)

model = Model(inp, pred)
model.compile('adam', loss=custom_loss_wrapper(W))

model.fit(X, y, epochs=3)

[2] 如果样本之间的权重不同:

您应该使用 add_loss 构建模型,以便动态地考虑每个样本的权重。这是一个虚拟示例:

n_sample = 200
X = np.random.uniform(0,1, (n_sample,10))
y = np.random.uniform(0,1, (n_sample,100))
W = np.random.uniform(0,1, (n_sample,100))

def custom_loss(true, pred, weights):
    sum_weights = tf.reduce_sum(weights)
    resid = tf.sqrt(tf.reduce_sum(weights * tf.square(true - pred)))
    return resid/sum_weights

inp = Input((10,))
true = Input((100,))
weights = Input((100,))
x = Dense(256)(inp)
pred = Dense(100)(x)

model = Model([inp,true,weights], pred)
model.add_loss(custom_loss(true, pred, weights))
model.compile('adam', loss=None)

model.fit([X,y,W], y=None, epochs=3)

当使用add_loss时,你应该将损失中涉及的所有张量作为输入层传递,并将它们传递到损失中进行计算。

在推理时,您可以像往常一样计算预测,只需删除真实值和权重作为输入:

final_model = Model(model.input[0], model.output)
final_model.predict(X)