如何在 Tensorflow Keras API 中使用配对数据集样本创建联合损失?

How to create joint loss with paired Dataset samples in Tensorflow Keras API?

我正在尝试训练自动编码器,其约束条件强制 hidden/encoded nodes/neurons 中的一个或多个具有可解释的值。我的训练方法使用成对图像(尽管训练后模型应该在单个图像上运行)并利用联合损失函数,其中包括(1)每个图像的重建损失和(2)hidden/encoded 向量,来自两个图像中的每一个。

我创建了一个类似的简单玩具问题和模型来使这一点更清楚。在玩具问题中,自动编码器的输入是长度为 3 的向量。编码使用一个致密层来计算均值(标量),另一个致密层来计算向量的一些其他表示(根据我的构造,它可能只学习单位矩阵,即复制输入向量)。见下图。隐藏层的最低节点用于计算输入向量的均值。除了必须适应与输入匹配的重建之外,其余隐藏节点不受约束。

下图展示了我希望如何使用配对图像训练模型。 “MSE”是均方误差,尽管实际函数的身份对于我在这里问的问题并不重要。损失函数是重建损失和均值估计损失之和。

我尝试创建 (1) tf.data.Dataset 来生成配对向量,(2) Keras 模型,以及 (3) 自定义损失函数。但是,我无法理解如何针对这种特定情况正确执行此操作。

我无法正确地将 Model.fit() 转换为 运行,也无法按预期将模型输出与数据集目标相关联。请参阅下面的代码和错误。谁能帮忙?我做了很多 Google 和 Whosebug 搜索,但仍然不明白如何实现它。

import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

DTYPE = tf.dtypes.float32
N_VEC = 3

def my_generator(n):
    while True:
        # Create two identical vectors of length, except with different means.
        # An internal layer (single neuron) of the model should predict the
        # mean of the input vector. To train it to do so, with paired
        # vector inputs, use a loss function that penalizes incorrect
        # predictions of the difference of the means of two input vectors.
        input_vec1 = tf.random.normal((n,), dtype=DTYPE)
        target_mean_diff = tf.random.normal((1,), dtype=DTYPE)
        input_vec2 = input_vec1 + target_mean_diff
        
        # Model is a constrained autoencoder. Output targets are
        # identical to the input vectors. Including them as explicit
        # targets in this generator, for generalization.
        target_vec1 = tf.identity(input_vec1)
        target_vec2 = tf.identity(input_vec2)
        
        yield ({'input_vec1':input_vec1,
                'input_vec2':input_vec2},
               {'target_vec1':target_vec1,
                'target_vec2':target_vec2,
                'target_mean_diff':target_mean_diff})

def my_dataset(n, batch_size=4):
    ds = tf.data.Dataset.from_generator(my_generator,
                                        output_signature=({'input_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
                                                           'input_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE)},
                                                          {'target_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
                                                           'target_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE),
                                                           'target_mean_diff':tf.TensorSpec(shape=(1,), dtype=DTYPE)}),
                                        args=(n,))
    ds = ds.batch(batch_size)    
    return ds


## Do a brief test using the Dataset
ds = my_dataset(N_VEC, batch_size=4)
ds_iter = iter(ds)
dict_inputs, dict_targets = next(ds_iter)
print(dict_inputs)
print(dict_targets)


## Define the Model
layer_encode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='encode_vec')
layer_decode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_vec')
layer_encode_mean = tf.keras.layers.Dense(1, activation=None, name='encode_mean')
layer_decode_mean = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_mean')

input1 = tf.keras.Input(shape=(N_VEC,), name='input_vec1')
input2 = tf.keras.Input(shape=(N_VEC,), name='input_vec2')
vec_encoded1 = layer_encode_vec(input1)
vec_encoded2 = layer_encode_vec(input2)
mean_encoded1 = layer_encode_mean(input1)
mean_encoded2 = layer_encode_mean(input2)
mean_diff = mean_encoded2 - mean_encoded1
pred_vec1 = layer_decode_vec(vec_encoded1) + layer_decode_mean(mean_encoded1)
pred_vec2 = layer_decode_vec(vec_encoded2) + layer_decode_mean(mean_encoded2)

model = tf.keras.Model(inputs=[input1, input2], outputs=[pred_vec1, pred_vec2, mean_diff])

print(model.summary())


## Define the joint loss function
def loss_total(y_true, y_pred):
    loss_reconstruct = tf.reduce_mean(tf.keras.MSE(y_true[0], y_pred[0]))/2 + \
                       tf.reduce_mean(tf.keras.MSE(y_true[1], y_pred[1]))/2
    loss_mean = tf.reduce_mean(tf.keras.MSE(y_true[2], y_pred[2]))
    return loss_reconstruct + loss_mean


## Compile model
optimizer = tf.keras.optimizers.Adam(lr=0.01)
model.compile(optimizer=optimizer, loss=loss_total)


## Train model
history = model.fit(x=ds, epochs=10, steps_per_epoch=10)

输出:来自数据集的示例批次:

{'input_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329,  0.32843253],
       [-0.61793506, -0.8276422 , -1.3469328 ],
       [-0.5401968 ,  0.3141346 , -1.3638284 ],
       [-1.2189807 ,  0.23848908,  0.75108534]], dtype=float32)>, 'input_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083,  0.27218163,  0.6245074 ],
       [-0.57636774, -0.7860749 , -1.3053654 ],
       [ 0.65463066,  1.508962  , -0.16900098],
       [-0.49326736,  0.9642024 ,  1.4767987 ]], dtype=float32)>}
{'target_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329,  0.32843253],
       [-0.61793506, -0.8276422 , -1.3469328 ],
       [-0.5401968 ,  0.3141346 , -1.3638284 ],
       [-1.2189807 ,  0.23848908,  0.75108534]], dtype=float32)>, 'target_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083,  0.27218163,  0.6245074 ],
       [-0.57636774, -0.7860749 , -1.3053654 ],
       [ 0.65463066,  1.508962  , -0.16900098],
       [-0.49326736,  0.9642024 ,  1.4767987 ]], dtype=float32)>, 'target_mean_diff': <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[0.29607493],
       [0.04156734],
       [1.1948274 ],
       [0.7257133 ]], dtype=float32)>}

输出:模型摘要:

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
==================================================================================================
input_vec1 (InputLayer)         [(None, 3)]          0                                            
__________________________________________________________________________________________________
input_vec2 (InputLayer)         [(None, 3)]          0                                            
__________________________________________________________________________________________________
encode_vec (Dense)              (None, 3)            12          input_vec1[0][0]                 
                                                                 input_vec2[0][0]                 
__________________________________________________________________________________________________
encode_mean (Dense)             (None, 1)            4           input_vec1[0][0]                 
                                                                 input_vec2[0][0]                 
__________________________________________________________________________________________________
decode_vec (Dense)              (None, 3)            12          encode_vec[0][0]                 
                                                                 encode_vec[1][0]                 
__________________________________________________________________________________________________
decode_mean (Dense)             (None, 3)            6           encode_mean[0][0]                
                                                                 encode_mean[1][0]                
__________________________________________________________________________________________________
tf.__operators__.add (TFOpLambd (None, 3)            0           decode_vec[0][0]                 
                                                                 decode_mean[0][0]                
__________________________________________________________________________________________________
tf.__operators__.add_1 (TFOpLam (None, 3)            0           decode_vec[1][0]                 
                                                                 decode_mean[1][0]                
__________________________________________________________________________________________________
tf.math.subtract (TFOpLambda)   (None, 1)            0           encode_mean[1][0]                
                                                                 encode_mean[0][0]                
==================================================================================================
Total params: 34
Trainable params: 34
Non-trainable params: 0
__________________________________________________________________________________________________

输出:调​​用model.fit()时的错误信息:

Epoch 1/10
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)

...

ValueError: Found unexpected keys that do not correspond to any
Model output: dict_keys(['target_vec1', 'target_vec2', 'target_mean_diff']).
Expected: ['tf.__operators__.add', 'tf.__operators__.add_1', 'tf.math.subtract']

对于 inputsoutputs,您可以将 dict 传递给 Model,如下所示:

model = tf.keras.Model(
    inputs={"input_vec1": input1, "input_vec2": input2},
    outputs={
        "target_vec1": pred_vec1,
        "target_vec2": pred_vec2,
        "target_mean_diff": mean_diff,
    },
)

这避免了必须命名输出层。

对于损失,它目前正在将 loss_total 分别应用于 3 个输出中的每一个并求和以获得最终损失,这不是您想要的。因此,您可以单独列出每项损失:

model.compile(
    optimizer=optimizer,
    loss={"target_vec1": "mse", "target_vec2": "mse", "target_mean_diff": "mse"},
    loss_weights={"target_vec1": 0.5, "target_vec2": 0.5, "target_mean_diff": 1},
)

或者您可以使用采用 dict 输入的修改后的损失函数手动训练模型。类似于:

def loss_total(y_true, y_pred):
    loss_reconstruct = (
        tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec1"], y_pred["target_vec1"])) / 2
        + tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec2"], y_pred["target_vec2"])) / 2
    )
    loss_mean = tf.reduce_mean(tf.keras.losses.MSE(y_true["target_mean_diff"], y_pred["target_mean_diff"]))
    return loss_reconstruct + loss_mean

for epoch in range(10):
    for batch, (x, y) in zip(range(10), ds):
        with tf.GradientTape() as tape:
            outputs = model(x, training=True)
            loss = loss_total(y, outputs)

        trainable_vars = model.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        optimizer.apply_gradients(zip(gradients, trainable_vars))
        print(f"Batch: {batch}, loss: {loss.numpy()}")