Keras 自定义损失作为多个输出的函数

Keras custom loss as a function of multiple outputs

我用 keras(一个卷积网络)构建了一个自定义架构。该网络有 4 个头,每个头输出一个不同大小的张量。我正在尝试编写一个自定义损失函数作为这 4 个输出的函数。我之前一直在实施自定义损失,但要么每个头的损失不同,要么每个头的损失相同。在这种情况下,我需要结合 4 个输出来计算损失。

我习惯了:

def custom_loss(y_true, y_pred):
    return something
model.compile(optimizer, loss=custom_loss)

但在我的例子中,我需要 y_pred 作为 4 个输出的列表。我可以用零填充输出并在我的模型中添加一个连接层,但我想知道是否有更简单的方法。

编辑

我的损失函数比较复杂,可以这样写吗:

model.add_loss(custom_loss(input1, input2, output1, output2))

自定义损失定义为:

def custom_loss(input1, input2, output1, output2):
    return loss

您可以试试model.add_loss()函数。这个想法是将您的自定义损失构造为张量而不是函数,将其添加到模型中,并在不进一步指定损失的情况下编译模型。另请参阅变分自动编码器的 this implementation,其中使用了类似的想法。

示例:

import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np

# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)

# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)

# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)

# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))

# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])

# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))

# Add loss to model
model.add_loss(loss)

# Compile without specifying a loss
model.compile(optimizer='sgd')

dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)

拟合模型时不需要虚拟变量

所以,你可以使用 model.fit([特征, labels_1, labels_2], epochs=2)

然后它在

下运行良好

tensorflow 版本 '1.14.0' keras.版本 '2.3.1'

您可以将您的输出打包在一个 tf.ExtensionType 中,然后在损失函数中再次解包。

我制作了一个 Colab Notebook,演示了如何在 tensorflow 2.8.0 中执行此操作。 (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)

使用这种方法与 add_loss() 相比的优点:

  • 无需在推理时定义“虚拟”标签。
  • 无需在模型内定义损失
  • 还不错

缺点:

  • 您的模型现在输出一个对象,其输出作为字段而不是直接张量(这可能是您用例的专业人士)。
  • 在撰写此答案时,tf.ExtensionTypess 不适用于 Tensorflow Serving)

我在这里添加了完整的代码,以防我不小心删除了 Colab Notebook:

import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0
print(tf.__version__)


class PackedTensor(tf.experimental.BatchableExtensionType):
    __name__ = 'extension_type_colab.PackedTensor'

    output_0: tf.Tensor
    output_1: tf.Tensor

    # shape and dtype hold no meaning in this context, so we use a dummy
    # to stop Keras from complaining

    shape = property(lambda self: self.output_0.shape)
    dtype = property(lambda self: self.output_0.dtype)

    class Spec:

        def __init__(self, shape, dtype=tf.float32):
            self.output_0 = tf.TensorSpec(shape, dtype)
            self.output_1 = tf.TensorSpec(shape, dtype)

        # shape and dtype hold no meaning in this context, so we use a dummy
        # to stop Keras from complaining
        shape: tf.TensorShape = tf.constant(1.).shape 
        dtype: tf.DType = tf.constant(1.).dtype


# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
@tf.experimental.dispatch_for_api(tf.shape)
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
    return tf.shape(input.col_ids)

@tf.experimental.dispatch_for_api(tf.cast)
def packed_cast(x: PackedTensor, dtype: str, name=None):
    return x


class SCCEWithExtraOutput(tf.keras.losses.Loss):
    """ This custom loss function is designed for models with an PackedTensor as
    a single output, so with attributes outputs_0 and outputs_1. This loss will 
    train a model so that outputs_0 represent the predicted class of the input
    image, and outputs_1 will be trained to always be zero (as a dummy). 
    """
    def __init__(self, *args, **kwargs):
        super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
        self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    def call(self, y_true, y_pred):
        output_0, output_1 = y_pred.output_0, y_pred.output_1
        scce = self.loss_fn(y_true, output_0)
        return scce + tf.abs(output_1)



# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)


# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
  def call(self, inputs, training=None):
    first_output, second_output = inputs
    packed_output = PackedTensor(first_output, second_output)
    return packed_output

# define the model
#
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
#                           |--> Dense(1)  ----^ 
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()

hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss=SCCEWithExtraOutput(),
    # metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)

model.fit(
    ds_train,
    epochs=1,
    # validation_data=ds_test,
)
model.save("savedmodel")

for index, sample in enumerate(ds_train):
  predicted_packed_tensor = model(sample[0])
  print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
  print(type(predicted_packed_tensor))
  if index > 10:
    break



# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".

import subprocess

script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)

for index, sample in enumerate(ds_train):
  predicted = model(sample[0])
  print(predicted.output_0.shape, predicted.output_1.shape)
  print(type(predicted))
  if index > 5:
    break
"""
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):
  print(line)