Keras 自定义损失作为多个输出的函数
Keras custom loss as a function of multiple outputs
我用 keras(一个卷积网络)构建了一个自定义架构。该网络有 4 个头,每个头输出一个不同大小的张量。我正在尝试编写一个自定义损失函数作为这 4 个输出的函数。我之前一直在实施自定义损失,但要么每个头的损失不同,要么每个头的损失相同。在这种情况下,我需要结合 4 个输出来计算损失。
我习惯了:
def custom_loss(y_true, y_pred):
return something
model.compile(optimizer, loss=custom_loss)
但在我的例子中,我需要 y_pred
作为 4 个输出的列表。我可以用零填充输出并在我的模型中添加一个连接层,但我想知道是否有更简单的方法。
编辑
我的损失函数比较复杂,可以这样写吗:
model.add_loss(custom_loss(input1, input2, output1, output2))
自定义损失定义为:
def custom_loss(input1, input2, output1, output2):
return loss
您可以试试model.add_loss()
函数。这个想法是将您的自定义损失构造为张量而不是函数,将其添加到模型中,并在不进一步指定损失的情况下编译模型。另请参阅变分自动编码器的 this implementation,其中使用了类似的想法。
示例:
import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np
# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)
# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)
# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)
# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))
# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])
# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))
# Add loss to model
model.add_loss(loss)
# Compile without specifying a loss
model.compile(optimizer='sgd')
dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)
拟合模型时不需要虚拟变量
所以,你可以使用
model.fit([特征, labels_1, labels_2], epochs=2)
然后它在
下运行良好
tensorflow 版本 '1.14.0'
keras.版本 '2.3.1'
您可以将您的输出打包在一个 tf.ExtensionType
中,然后在损失函数中再次解包。
我制作了一个 Colab Notebook,演示了如何在 tensorflow 2.8.0
中执行此操作。 (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)
使用这种方法与 add_loss()
相比的优点:
- 无需在推理时定义“虚拟”标签。
- 无需在模型内定义损失。
- 还不错
缺点:
- 您的模型现在输出一个对象,其输出作为字段而不是直接张量(这可能是您用例的专业人士)。
- 在撰写此答案时,
tf.ExtensionTypes
s 不适用于 Tensorflow Serving
)
我在这里添加了完整的代码,以防我不小心删除了 Colab Notebook:
import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0
print(tf.__version__)
class PackedTensor(tf.experimental.BatchableExtensionType):
__name__ = 'extension_type_colab.PackedTensor'
output_0: tf.Tensor
output_1: tf.Tensor
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape = property(lambda self: self.output_0.shape)
dtype = property(lambda self: self.output_0.dtype)
class Spec:
def __init__(self, shape, dtype=tf.float32):
self.output_0 = tf.TensorSpec(shape, dtype)
self.output_1 = tf.TensorSpec(shape, dtype)
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape: tf.TensorShape = tf.constant(1.).shape
dtype: tf.DType = tf.constant(1.).dtype
# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
@tf.experimental.dispatch_for_api(tf.shape)
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
return tf.shape(input.col_ids)
@tf.experimental.dispatch_for_api(tf.cast)
def packed_cast(x: PackedTensor, dtype: str, name=None):
return x
class SCCEWithExtraOutput(tf.keras.losses.Loss):
""" This custom loss function is designed for models with an PackedTensor as
a single output, so with attributes outputs_0 and outputs_1. This loss will
train a model so that outputs_0 represent the predicted class of the input
image, and outputs_1 will be trained to always be zero (as a dummy).
"""
def __init__(self, *args, **kwargs):
super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
def call(self, y_true, y_pred):
output_0, output_1 = y_pred.output_0, y_pred.output_1
scce = self.loss_fn(y_true, output_0)
return scce + tf.abs(output_1)
# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
def call(self, inputs, training=None):
first_output, second_output = inputs
packed_output = PackedTensor(first_output, second_output)
return packed_output
# define the model
#
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
# |--> Dense(1) ----^
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()
hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=SCCEWithExtraOutput(),
# metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
model.fit(
ds_train,
epochs=1,
# validation_data=ds_test,
)
model.save("savedmodel")
for index, sample in enumerate(ds_train):
predicted_packed_tensor = model(sample[0])
print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
print(type(predicted_packed_tensor))
if index > 10:
break
# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".
import subprocess
script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)
for index, sample in enumerate(ds_train):
predicted = model(sample[0])
print(predicted.output_0.shape, predicted.output_1.shape)
print(type(predicted))
if index > 5:
break
"""
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):
print(line)
我用 keras(一个卷积网络)构建了一个自定义架构。该网络有 4 个头,每个头输出一个不同大小的张量。我正在尝试编写一个自定义损失函数作为这 4 个输出的函数。我之前一直在实施自定义损失,但要么每个头的损失不同,要么每个头的损失相同。在这种情况下,我需要结合 4 个输出来计算损失。
我习惯了:
def custom_loss(y_true, y_pred):
return something
model.compile(optimizer, loss=custom_loss)
但在我的例子中,我需要 y_pred
作为 4 个输出的列表。我可以用零填充输出并在我的模型中添加一个连接层,但我想知道是否有更简单的方法。
编辑
我的损失函数比较复杂,可以这样写吗:
model.add_loss(custom_loss(input1, input2, output1, output2))
自定义损失定义为:
def custom_loss(input1, input2, output1, output2):
return loss
您可以试试model.add_loss()
函数。这个想法是将您的自定义损失构造为张量而不是函数,将其添加到模型中,并在不进一步指定损失的情况下编译模型。另请参阅变分自动编码器的 this implementation,其中使用了类似的想法。
示例:
import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np
# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)
# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)
# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)
# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))
# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])
# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))
# Add loss to model
model.add_loss(loss)
# Compile without specifying a loss
model.compile(optimizer='sgd')
dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)
拟合模型时不需要虚拟变量
所以,你可以使用 model.fit([特征, labels_1, labels_2], epochs=2)
然后它在
下运行良好tensorflow 版本 '1.14.0' keras.版本 '2.3.1'
您可以将您的输出打包在一个 tf.ExtensionType
中,然后在损失函数中再次解包。
我制作了一个 Colab Notebook,演示了如何在 tensorflow 2.8.0
中执行此操作。 (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)
使用这种方法与 add_loss()
相比的优点:
- 无需在推理时定义“虚拟”标签。
- 无需在模型内定义损失。
- 还不错
缺点:
- 您的模型现在输出一个对象,其输出作为字段而不是直接张量(这可能是您用例的专业人士)。
- 在撰写此答案时,
tf.ExtensionTypes
s 不适用于Tensorflow Serving
)
我在这里添加了完整的代码,以防我不小心删除了 Colab Notebook:
import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0
print(tf.__version__)
class PackedTensor(tf.experimental.BatchableExtensionType):
__name__ = 'extension_type_colab.PackedTensor'
output_0: tf.Tensor
output_1: tf.Tensor
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape = property(lambda self: self.output_0.shape)
dtype = property(lambda self: self.output_0.dtype)
class Spec:
def __init__(self, shape, dtype=tf.float32):
self.output_0 = tf.TensorSpec(shape, dtype)
self.output_1 = tf.TensorSpec(shape, dtype)
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape: tf.TensorShape = tf.constant(1.).shape
dtype: tf.DType = tf.constant(1.).dtype
# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
@tf.experimental.dispatch_for_api(tf.shape)
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
return tf.shape(input.col_ids)
@tf.experimental.dispatch_for_api(tf.cast)
def packed_cast(x: PackedTensor, dtype: str, name=None):
return x
class SCCEWithExtraOutput(tf.keras.losses.Loss):
""" This custom loss function is designed for models with an PackedTensor as
a single output, so with attributes outputs_0 and outputs_1. This loss will
train a model so that outputs_0 represent the predicted class of the input
image, and outputs_1 will be trained to always be zero (as a dummy).
"""
def __init__(self, *args, **kwargs):
super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
def call(self, y_true, y_pred):
output_0, output_1 = y_pred.output_0, y_pred.output_1
scce = self.loss_fn(y_true, output_0)
return scce + tf.abs(output_1)
# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
def call(self, inputs, training=None):
first_output, second_output = inputs
packed_output = PackedTensor(first_output, second_output)
return packed_output
# define the model
#
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
# |--> Dense(1) ----^
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()
hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=SCCEWithExtraOutput(),
# metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
model.fit(
ds_train,
epochs=1,
# validation_data=ds_test,
)
model.save("savedmodel")
for index, sample in enumerate(ds_train):
predicted_packed_tensor = model(sample[0])
print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
print(type(predicted_packed_tensor))
if index > 10:
break
# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".
import subprocess
script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)
for index, sample in enumerate(ds_train):
predicted = model(sample[0])
print(predicted.output_0.shape, predicted.output_1.shape)
print(type(predicted))
if index > 5:
break
"""
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):
print(line)