如何在 Tensorflow Keras API 中使用配对数据集样本创建联合损失?
How to create joint loss with paired Dataset samples in Tensorflow Keras API?
我正在尝试训练自动编码器,其约束条件强制 hidden/encoded nodes/neurons 中的一个或多个具有可解释的值。我的训练方法使用成对图像(尽管训练后模型应该在单个图像上运行)并利用联合损失函数,其中包括(1)每个图像的重建损失和(2)hidden/encoded 向量,来自两个图像中的每一个。
我创建了一个类似的简单玩具问题和模型来使这一点更清楚。在玩具问题中,自动编码器的输入是长度为 3 的向量。编码使用一个致密层来计算均值(标量),另一个致密层来计算向量的一些其他表示(根据我的构造,它可能只学习单位矩阵,即复制输入向量)。见下图。隐藏层的最低节点用于计算输入向量的均值。除了必须适应与输入匹配的重建之外,其余隐藏节点不受约束。
下图展示了我希望如何使用配对图像训练模型。 “MSE”是均方误差,尽管实际函数的身份对于我在这里问的问题并不重要。损失函数是重建损失和均值估计损失之和。
我尝试创建 (1) tf.data.Dataset 来生成配对向量,(2) Keras 模型,以及 (3) 自定义损失函数。但是,我无法理解如何针对这种特定情况正确执行此操作。
我无法正确地将 Model.fit() 转换为 运行,也无法按预期将模型输出与数据集目标相关联。请参阅下面的代码和错误。谁能帮忙?我做了很多 Google 和 Whosebug 搜索,但仍然不明白如何实现它。
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
DTYPE = tf.dtypes.float32
N_VEC = 3
def my_generator(n):
while True:
# Create two identical vectors of length, except with different means.
# An internal layer (single neuron) of the model should predict the
# mean of the input vector. To train it to do so, with paired
# vector inputs, use a loss function that penalizes incorrect
# predictions of the difference of the means of two input vectors.
input_vec1 = tf.random.normal((n,), dtype=DTYPE)
target_mean_diff = tf.random.normal((1,), dtype=DTYPE)
input_vec2 = input_vec1 + target_mean_diff
# Model is a constrained autoencoder. Output targets are
# identical to the input vectors. Including them as explicit
# targets in this generator, for generalization.
target_vec1 = tf.identity(input_vec1)
target_vec2 = tf.identity(input_vec2)
yield ({'input_vec1':input_vec1,
'input_vec2':input_vec2},
{'target_vec1':target_vec1,
'target_vec2':target_vec2,
'target_mean_diff':target_mean_diff})
def my_dataset(n, batch_size=4):
ds = tf.data.Dataset.from_generator(my_generator,
output_signature=({'input_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'input_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE)},
{'target_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'target_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'target_mean_diff':tf.TensorSpec(shape=(1,), dtype=DTYPE)}),
args=(n,))
ds = ds.batch(batch_size)
return ds
## Do a brief test using the Dataset
ds = my_dataset(N_VEC, batch_size=4)
ds_iter = iter(ds)
dict_inputs, dict_targets = next(ds_iter)
print(dict_inputs)
print(dict_targets)
## Define the Model
layer_encode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='encode_vec')
layer_decode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_vec')
layer_encode_mean = tf.keras.layers.Dense(1, activation=None, name='encode_mean')
layer_decode_mean = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_mean')
input1 = tf.keras.Input(shape=(N_VEC,), name='input_vec1')
input2 = tf.keras.Input(shape=(N_VEC,), name='input_vec2')
vec_encoded1 = layer_encode_vec(input1)
vec_encoded2 = layer_encode_vec(input2)
mean_encoded1 = layer_encode_mean(input1)
mean_encoded2 = layer_encode_mean(input2)
mean_diff = mean_encoded2 - mean_encoded1
pred_vec1 = layer_decode_vec(vec_encoded1) + layer_decode_mean(mean_encoded1)
pred_vec2 = layer_decode_vec(vec_encoded2) + layer_decode_mean(mean_encoded2)
model = tf.keras.Model(inputs=[input1, input2], outputs=[pred_vec1, pred_vec2, mean_diff])
print(model.summary())
## Define the joint loss function
def loss_total(y_true, y_pred):
loss_reconstruct = tf.reduce_mean(tf.keras.MSE(y_true[0], y_pred[0]))/2 + \
tf.reduce_mean(tf.keras.MSE(y_true[1], y_pred[1]))/2
loss_mean = tf.reduce_mean(tf.keras.MSE(y_true[2], y_pred[2]))
return loss_reconstruct + loss_mean
## Compile model
optimizer = tf.keras.optimizers.Adam(lr=0.01)
model.compile(optimizer=optimizer, loss=loss_total)
## Train model
history = model.fit(x=ds, epochs=10, steps_per_epoch=10)
输出:来自数据集的示例批次:
{'input_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329, 0.32843253],
[-0.61793506, -0.8276422 , -1.3469328 ],
[-0.5401968 , 0.3141346 , -1.3638284 ],
[-1.2189807 , 0.23848908, 0.75108534]], dtype=float32)>, 'input_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083, 0.27218163, 0.6245074 ],
[-0.57636774, -0.7860749 , -1.3053654 ],
[ 0.65463066, 1.508962 , -0.16900098],
[-0.49326736, 0.9642024 , 1.4767987 ]], dtype=float32)>}
{'target_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329, 0.32843253],
[-0.61793506, -0.8276422 , -1.3469328 ],
[-0.5401968 , 0.3141346 , -1.3638284 ],
[-1.2189807 , 0.23848908, 0.75108534]], dtype=float32)>, 'target_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083, 0.27218163, 0.6245074 ],
[-0.57636774, -0.7860749 , -1.3053654 ],
[ 0.65463066, 1.508962 , -0.16900098],
[-0.49326736, 0.9642024 , 1.4767987 ]], dtype=float32)>, 'target_mean_diff': <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[0.29607493],
[0.04156734],
[1.1948274 ],
[0.7257133 ]], dtype=float32)>}
输出:模型摘要:
Model: "model"
__________________________________________________________________________________________________
Layer (type) Output Shape Param # Connected to
==================================================================================================
input_vec1 (InputLayer) [(None, 3)] 0
__________________________________________________________________________________________________
input_vec2 (InputLayer) [(None, 3)] 0
__________________________________________________________________________________________________
encode_vec (Dense) (None, 3) 12 input_vec1[0][0]
input_vec2[0][0]
__________________________________________________________________________________________________
encode_mean (Dense) (None, 1) 4 input_vec1[0][0]
input_vec2[0][0]
__________________________________________________________________________________________________
decode_vec (Dense) (None, 3) 12 encode_vec[0][0]
encode_vec[1][0]
__________________________________________________________________________________________________
decode_mean (Dense) (None, 3) 6 encode_mean[0][0]
encode_mean[1][0]
__________________________________________________________________________________________________
tf.__operators__.add (TFOpLambd (None, 3) 0 decode_vec[0][0]
decode_mean[0][0]
__________________________________________________________________________________________________
tf.__operators__.add_1 (TFOpLam (None, 3) 0 decode_vec[1][0]
decode_mean[1][0]
__________________________________________________________________________________________________
tf.math.subtract (TFOpLambda) (None, 1) 0 encode_mean[1][0]
encode_mean[0][0]
==================================================================================================
Total params: 34
Trainable params: 34
Non-trainable params: 0
__________________________________________________________________________________________________
输出:调用model.fit()时的错误信息:
Epoch 1/10
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
...
ValueError: Found unexpected keys that do not correspond to any
Model output: dict_keys(['target_vec1', 'target_vec2', 'target_mean_diff']).
Expected: ['tf.__operators__.add', 'tf.__operators__.add_1', 'tf.math.subtract']
对于 inputs
和 outputs
,您可以将 dict
传递给 Model
,如下所示:
model = tf.keras.Model(
inputs={"input_vec1": input1, "input_vec2": input2},
outputs={
"target_vec1": pred_vec1,
"target_vec2": pred_vec2,
"target_mean_diff": mean_diff,
},
)
这避免了必须命名输出层。
对于损失,它目前正在将 loss_total
分别应用于 3 个输出中的每一个并求和以获得最终损失,这不是您想要的。因此,您可以单独列出每项损失:
model.compile(
optimizer=optimizer,
loss={"target_vec1": "mse", "target_vec2": "mse", "target_mean_diff": "mse"},
loss_weights={"target_vec1": 0.5, "target_vec2": 0.5, "target_mean_diff": 1},
)
或者您可以使用采用 dict
输入的修改后的损失函数手动训练模型。类似于:
def loss_total(y_true, y_pred):
loss_reconstruct = (
tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec1"], y_pred["target_vec1"])) / 2
+ tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec2"], y_pred["target_vec2"])) / 2
)
loss_mean = tf.reduce_mean(tf.keras.losses.MSE(y_true["target_mean_diff"], y_pred["target_mean_diff"]))
return loss_reconstruct + loss_mean
for epoch in range(10):
for batch, (x, y) in zip(range(10), ds):
with tf.GradientTape() as tape:
outputs = model(x, training=True)
loss = loss_total(y, outputs)
trainable_vars = model.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
optimizer.apply_gradients(zip(gradients, trainable_vars))
print(f"Batch: {batch}, loss: {loss.numpy()}")
我正在尝试训练自动编码器,其约束条件强制 hidden/encoded nodes/neurons 中的一个或多个具有可解释的值。我的训练方法使用成对图像(尽管训练后模型应该在单个图像上运行)并利用联合损失函数,其中包括(1)每个图像的重建损失和(2)hidden/encoded 向量,来自两个图像中的每一个。
我创建了一个类似的简单玩具问题和模型来使这一点更清楚。在玩具问题中,自动编码器的输入是长度为 3 的向量。编码使用一个致密层来计算均值(标量),另一个致密层来计算向量的一些其他表示(根据我的构造,它可能只学习单位矩阵,即复制输入向量)。见下图。隐藏层的最低节点用于计算输入向量的均值。除了必须适应与输入匹配的重建之外,其余隐藏节点不受约束。
下图展示了我希望如何使用配对图像训练模型。 “MSE”是均方误差,尽管实际函数的身份对于我在这里问的问题并不重要。损失函数是重建损失和均值估计损失之和。
我尝试创建 (1) tf.data.Dataset 来生成配对向量,(2) Keras 模型,以及 (3) 自定义损失函数。但是,我无法理解如何针对这种特定情况正确执行此操作。
我无法正确地将 Model.fit() 转换为 运行,也无法按预期将模型输出与数据集目标相关联。请参阅下面的代码和错误。谁能帮忙?我做了很多 Google 和 Whosebug 搜索,但仍然不明白如何实现它。
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
DTYPE = tf.dtypes.float32
N_VEC = 3
def my_generator(n):
while True:
# Create two identical vectors of length, except with different means.
# An internal layer (single neuron) of the model should predict the
# mean of the input vector. To train it to do so, with paired
# vector inputs, use a loss function that penalizes incorrect
# predictions of the difference of the means of two input vectors.
input_vec1 = tf.random.normal((n,), dtype=DTYPE)
target_mean_diff = tf.random.normal((1,), dtype=DTYPE)
input_vec2 = input_vec1 + target_mean_diff
# Model is a constrained autoencoder. Output targets are
# identical to the input vectors. Including them as explicit
# targets in this generator, for generalization.
target_vec1 = tf.identity(input_vec1)
target_vec2 = tf.identity(input_vec2)
yield ({'input_vec1':input_vec1,
'input_vec2':input_vec2},
{'target_vec1':target_vec1,
'target_vec2':target_vec2,
'target_mean_diff':target_mean_diff})
def my_dataset(n, batch_size=4):
ds = tf.data.Dataset.from_generator(my_generator,
output_signature=({'input_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'input_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE)},
{'target_vec1':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'target_vec2':tf.TensorSpec(shape=(n,), dtype=DTYPE),
'target_mean_diff':tf.TensorSpec(shape=(1,), dtype=DTYPE)}),
args=(n,))
ds = ds.batch(batch_size)
return ds
## Do a brief test using the Dataset
ds = my_dataset(N_VEC, batch_size=4)
ds_iter = iter(ds)
dict_inputs, dict_targets = next(ds_iter)
print(dict_inputs)
print(dict_targets)
## Define the Model
layer_encode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='encode_vec')
layer_decode_vec = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_vec')
layer_encode_mean = tf.keras.layers.Dense(1, activation=None, name='encode_mean')
layer_decode_mean = tf.keras.layers.Dense(N_VEC, activation=None, name='decode_mean')
input1 = tf.keras.Input(shape=(N_VEC,), name='input_vec1')
input2 = tf.keras.Input(shape=(N_VEC,), name='input_vec2')
vec_encoded1 = layer_encode_vec(input1)
vec_encoded2 = layer_encode_vec(input2)
mean_encoded1 = layer_encode_mean(input1)
mean_encoded2 = layer_encode_mean(input2)
mean_diff = mean_encoded2 - mean_encoded1
pred_vec1 = layer_decode_vec(vec_encoded1) + layer_decode_mean(mean_encoded1)
pred_vec2 = layer_decode_vec(vec_encoded2) + layer_decode_mean(mean_encoded2)
model = tf.keras.Model(inputs=[input1, input2], outputs=[pred_vec1, pred_vec2, mean_diff])
print(model.summary())
## Define the joint loss function
def loss_total(y_true, y_pred):
loss_reconstruct = tf.reduce_mean(tf.keras.MSE(y_true[0], y_pred[0]))/2 + \
tf.reduce_mean(tf.keras.MSE(y_true[1], y_pred[1]))/2
loss_mean = tf.reduce_mean(tf.keras.MSE(y_true[2], y_pred[2]))
return loss_reconstruct + loss_mean
## Compile model
optimizer = tf.keras.optimizers.Adam(lr=0.01)
model.compile(optimizer=optimizer, loss=loss_total)
## Train model
history = model.fit(x=ds, epochs=10, steps_per_epoch=10)
输出:来自数据集的示例批次:
{'input_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329, 0.32843253],
[-0.61793506, -0.8276422 , -1.3469328 ],
[-0.5401968 , 0.3141346 , -1.3638284 ],
[-1.2189807 , 0.23848908, 0.75108534]], dtype=float32)>, 'input_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083, 0.27218163, 0.6245074 ],
[-0.57636774, -0.7860749 , -1.3053654 ],
[ 0.65463066, 1.508962 , -0.16900098],
[-0.49326736, 0.9642024 , 1.4767987 ]], dtype=float32)>}
{'target_vec1': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.53022575, -0.02389329, 0.32843253],
[-0.61793506, -0.8276422 , -1.3469328 ],
[-0.5401968 , 0.3141346 , -1.3638284 ],
[-1.2189807 , 0.23848908, 0.75108534]], dtype=float32)>, 'target_vec2': <tf.Tensor: shape=(4, 3), dtype=float32, numpy=
array([[-0.23415083, 0.27218163, 0.6245074 ],
[-0.57636774, -0.7860749 , -1.3053654 ],
[ 0.65463066, 1.508962 , -0.16900098],
[-0.49326736, 0.9642024 , 1.4767987 ]], dtype=float32)>, 'target_mean_diff': <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[0.29607493],
[0.04156734],
[1.1948274 ],
[0.7257133 ]], dtype=float32)>}
输出:模型摘要:
Model: "model"
__________________________________________________________________________________________________
Layer (type) Output Shape Param # Connected to
==================================================================================================
input_vec1 (InputLayer) [(None, 3)] 0
__________________________________________________________________________________________________
input_vec2 (InputLayer) [(None, 3)] 0
__________________________________________________________________________________________________
encode_vec (Dense) (None, 3) 12 input_vec1[0][0]
input_vec2[0][0]
__________________________________________________________________________________________________
encode_mean (Dense) (None, 1) 4 input_vec1[0][0]
input_vec2[0][0]
__________________________________________________________________________________________________
decode_vec (Dense) (None, 3) 12 encode_vec[0][0]
encode_vec[1][0]
__________________________________________________________________________________________________
decode_mean (Dense) (None, 3) 6 encode_mean[0][0]
encode_mean[1][0]
__________________________________________________________________________________________________
tf.__operators__.add (TFOpLambd (None, 3) 0 decode_vec[0][0]
decode_mean[0][0]
__________________________________________________________________________________________________
tf.__operators__.add_1 (TFOpLam (None, 3) 0 decode_vec[1][0]
decode_mean[1][0]
__________________________________________________________________________________________________
tf.math.subtract (TFOpLambda) (None, 1) 0 encode_mean[1][0]
encode_mean[0][0]
==================================================================================================
Total params: 34
Trainable params: 34
Non-trainable params: 0
__________________________________________________________________________________________________
输出:调用model.fit()时的错误信息:
Epoch 1/10
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
...
ValueError: Found unexpected keys that do not correspond to any
Model output: dict_keys(['target_vec1', 'target_vec2', 'target_mean_diff']).
Expected: ['tf.__operators__.add', 'tf.__operators__.add_1', 'tf.math.subtract']
对于 inputs
和 outputs
,您可以将 dict
传递给 Model
,如下所示:
model = tf.keras.Model(
inputs={"input_vec1": input1, "input_vec2": input2},
outputs={
"target_vec1": pred_vec1,
"target_vec2": pred_vec2,
"target_mean_diff": mean_diff,
},
)
这避免了必须命名输出层。
对于损失,它目前正在将 loss_total
分别应用于 3 个输出中的每一个并求和以获得最终损失,这不是您想要的。因此,您可以单独列出每项损失:
model.compile(
optimizer=optimizer,
loss={"target_vec1": "mse", "target_vec2": "mse", "target_mean_diff": "mse"},
loss_weights={"target_vec1": 0.5, "target_vec2": 0.5, "target_mean_diff": 1},
)
或者您可以使用采用 dict
输入的修改后的损失函数手动训练模型。类似于:
def loss_total(y_true, y_pred):
loss_reconstruct = (
tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec1"], y_pred["target_vec1"])) / 2
+ tf.reduce_mean(tf.keras.losses.MSE(y_true["target_vec2"], y_pred["target_vec2"])) / 2
)
loss_mean = tf.reduce_mean(tf.keras.losses.MSE(y_true["target_mean_diff"], y_pred["target_mean_diff"]))
return loss_reconstruct + loss_mean
for epoch in range(10):
for batch, (x, y) in zip(range(10), ds):
with tf.GradientTape() as tape:
outputs = model(x, training=True)
loss = loss_total(y, outputs)
trainable_vars = model.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
optimizer.apply_gradients(zip(gradients, trainable_vars))
print(f"Batch: {batch}, loss: {loss.numpy()}")