当使用 TFX 生成数据集时,如何将 tf.Dataset 拟合到 Keras 自动编码器模型?

How do you fit a tf.Dataset to a Keras Autoencoder Model when the Dataset has been generated using TFX?

问题

正如标题所示,我一直在尝试创建一个管道来使用 TFX 训练自动编码器模型。我遇到的问题是将 DataAccessor.tf_dataset_factory object 返回的 tf.Dataset 拟合到自动编码器。

下面我总结了我完成这个项目的步骤,如果您想跳过背景信息,可以在底部提出一些问题。

简介

TFX 管道

到目前为止我使用的 TFX 组件是:

型号

我尝试训练的模型基于此处列出的示例 https://www.tensorflow.org/tutorials/generative/autoencoder。但是,我的模型正在接受表格数据训练,搜索异常结果,而不是图像数据。

因为我尝试了几个解决方案,所以我尝试使用 Keras.layers 和 Keras.model 格式来定义模型,我在下面概述了两者:

子类化 Keras.Model
class Autoencoder(keras.models.Model):
    def __init__(self, features):
        super(Autoencoder, self).__init__()
        
        self.encoder = tf.keras.Sequential([
            keras.layers.Dense(82, activation = 'relu'),
            keras.layers.Dense(32, activation = 'relu'),
            keras.layers.Dense(16, activation = 'relu'),
            keras.layers.Dense(8, activation = 'relu')
        ])
        
        self.decoder = tf.keras.Sequential([
            keras.layers.Dense(16, activation = 'relu'),
            keras.layers.Dense(32, activation = 'relu'),
            keras.layers.Dense(len(features), activation = 'sigmoid')
        ])

    def call(self, x):
        inputs = [keras.layers.Input(shape = (1,), name = f) for f in features]
        dense = keras.layers.concatenate(inputs)
        
        encoded = self.encoder(dense)
        decoded = self.decoder(encoded)
    
        return decoded
子类化 Keras.Layers
def _build_keras_model(features: List[str]) -> tf.keras.Model:
    inputs = [keras.layers.Input(shape = (1,), name = f) for f in features]
    dense = keras.layers.concatenate(inputs)

    dense = keras.layers.Dense(32, activation = 'relu')(dense)
    dense = keras.layers.Dense(16, activation = 'relu')(dense)
    dense = keras.layers.Dense(8, activation = 'relu')(dense)
    dense = keras.layers.Dense(16, activation = 'relu')(dense)
    dense = keras.layers.Dense(32, activation = 'relu')(dense)
    outputs = keras.layers.Dense(len(features), activation = 'sigmoid')(dense)
    
    model = keras.Model(inputs = inputs, outputs = outputs)
    model.compile(
        optimizer = 'adam',
        loss = 'mae'
    )

    return model

TFX 训练器组件

为了创建培训师组件,我主要遵循此处列出的实施细节:https://www.tensorflow.org/tfx/guide/trainer

以及以下默认企鹅示例:https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple#write_model_training_code

run_fn 定义
def run_fn(fn_args: tfx.components.FnArgs) -> None:
    tft_output = tft.TFTransformOutput(fn_args.transform_output)
    
    train_dataset = _input_fn(
        file_pattern = fn_args.train_files,
        data_accessor = fn_args.data_accessor,
        tf_transform_output = tft_output,
        batch_size = fn_args.train_steps
    )

    eval_dataset = _input_fn(
        file_pattern = fn_args.eval_files,
        data_accessor = fn_args.data_accessor,
        tf_transform_output = tft_output,
        batch_size = fn_args.custom_config['eval_batch_size']
    )

#   model = Autoencoder(
#       features = fn_args.custom_config['features']
#   )
    model = _build_keras_model(features = fn_args.custom_config['features'])
        
    model.compile(optimizer = 'adam', loss = 'mse')
    
    model.fit(
        train_dataset,
        steps_per_epoch = fn_args.train_steps,
        validation_data = eval_dataset,
        validation_steps = fn_args.eval_steps
    )
    
    ...
_input_fn定义
def _apply_preprocessing(raw_features, tft_layer):
    transformed_features = tft_layer(raw_features)
    return transformed_features

def _input_fn(
    file_pattern,
    data_accessor: tfx.components.DataAccessor,
    tf_transform_output: tft.TFTransformOutput,
    batch_size: int) -> tf.data.Dataset:
    """
    Generates features and label for tuning/training.
      Args:
        file_pattern: List of paths or patterns of input tfrecord files.
        data_accessor: DataAccessor for converting input to RecordBatch.
        tf_transform_output: A TFTransformOutput.
        batch_size: representing the number of consecutive elements of returned
          dataset to combine in a single batch
      Returns:
        A dataset that contains features where features is a
          dictionary of Tensors.
    """
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size = batch_size),
        tf_transform_output.transformed_metadata.schema
     )
    
    transform_layer = tf_transform_output.transform_features_layer()
    def apply_transform(raw_features):
        return _apply_preprocessing(raw_features, transform_layer)
    
    return dataset.map(apply_transform).repeat()

这与上面给出的 _input_fn 示例不同,因为我正在按照此处找到的下一个 tfx 教程中的示例进行操作:https://www.tensorflow.org/tfx/tutorials/tfx/penguin_tft#run_fn

另外供参考,示例数据中没有目标,因此没有 label_key 传递给 tfxio.TensorFlowDatasetOptions object。

错误

尝试使用 TFX 运行 Trainer 组件时 InteractiveContext object 我收到以下错误。

ValueError: No gradients provided for any variable: ['dense_460/kernel:0', 'dense_460/bias:0', 'dense_461/kernel:0', 'dense_461/bias:0', 'dense_462/kernel:0', 'dense_462/bias:0', 'dense_463/kernel:0', 'dense_463/bias:0', 'dense_464/kernel:0', 'dense_464/bias:0', 'dense_465/kernel:0', 'dense_465/bias:0'].

从我自己解决这个问题的尝试来看,我认为问题出在自动编码器的训练方式上。从这里链接的自动编码器示例 https://www.tensorflow.org/tutorials/generative/autoencoder 数据是这样拟合的:

autoencoder.fit(x_train, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test, x_test))

因此,tf.Dataset 也应该模仿这种行为,并且在使用普通 Tensor objects 进行测试时,我已经能够重新创建上面的错误,然后在添加目标与 .fit() 函数中的训练数据相同。

到目前为止我已经尝试过的事情

复制训练数据集
    model.fit(
        train_dataset,
        train_dataset,
        steps_per_epoch = fn_args.train_steps,
        validation_data = eval_dataset,
        validation_steps = fn_args.eval_steps
    )

由于 Keras 在传递数据集时不接受 'y' 值而引发错误。

ValueError: `y` argument is not supported when using dataset as input.
返回一个数据集,它本身就是一个元组
def _input_fn(...


    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size = batch_size),
        tf_transform_output.transformed_metadata.schema
     )
    
    transform_layer = tf_transform_output.transform_features_layer()
    def apply_transform(raw_features):
        return _apply_preprocessing(raw_features, transform_layer)
    
    dataset = dataset.map(apply_transform)
    
    return dataset.map(lambda x: (x, x))

这会引发一个错误,其中特征字典中的键与模型的输出不匹配。

ValueError: Found unexpected keys that do not correspond to any Model output: dict_keys(['feature_string', ...]). Expected: ['dense_477']

此时我切换到使用 keras.model Autoencoder 子类并尝试使用我尝试以与输入相同的方式动态创建的输出将输出键添加到模型。

    def call(self, x):
        inputs = [keras.layers.Input(shape = (1,), name = f) for f in x]
        dense = keras.layers.concatenate(inputs)
        
        encoded = self.encoder(dense)
        decoded = self.decoder(encoded)
    
        outputs = {}
        for feature_name in x:
            outputs[feature_name] = keras.layers.Dense(1, activation = 'sigmoid')(decoded)

        return outputs

这会引发以下错误:

TypeError: Cannot convert a symbolic Keras input/output to a numpy array. This error may indicate that you're trying to pass a symbolic value to a NumPy call, which is not supported. Or, you may be trying to pass Keras symbolic inputs/outputs to a TF API that does not register dispatching, preventing Keras from automatically converting the API call to a lambda layer in the Functional Model.

我一直在研究解决这个问题,但我不再确定数据是否正确传递,我开始认为我从实际问题中得到 side-tracked。

问题

所以我设法找到了这个问题的答案,并想把我找到的东西留在这里,以防其他人遇到类似的问题。

事实证明我对错误的感觉是正确的,解决方案确实在于 tf.Dataset 对象的呈现方式。

这可以在我 运行 一些使用 运行domly 生成的张量模拟传入数据的代码时得到证明。

tensors = [tf.random.uniform(shape = (1, 82)) for i in range(739)]
# This gives us a list of 739 tensors which hold 1 value for 82 'features' simulating the dataset I had

dataset = tf.data.Dataset.from_tensor_slices(tensors)
dataset = dataset.map(lambda x : (x, x))
# This returns a dataset which marks the training set and target as the same
# which is what the Autoecnoder model is looking for

model.fit(dataset ...) 

之后,我继续对 _input_fn 返回的数据集执行相同的操作。鉴于 tfx DataAccessor 对象 returns a features_dict 但是我需要将该字典中的张量组合在一起以创建单个张量。

这是我的 _input_fn 现在的样子:

def create_target_values(features_dict: Dict[str, tf.Tensor]) -> tuple:
    value_tensor = tf.concat(list(features_dict.values()), axis = 1)
    return (features_dict, value_tensor)

def _input_fn(
    file_pattern,
    data_accessor: tfx.components.DataAccessor,
    tf_transform_output: tft.TFTransformOutput,
    batch_size: int) -> tf.data.Dataset:
    """
    Generates features and label for tuning/training.
      Args:
        file_pattern: List of paths or patterns of input tfrecord files.
        data_accessor: DataAccessor for converting input to RecordBatch.
        tf_transform_output: A TFTransformOutput.
        batch_size: representing the number of consecutive elements of returned
          dataset to combine in a single batch
      Returns:
        A dataset that contains (features, target_tensor) tuple where features is a
          dictionary of Tensors, and target_tensor is a single Tensor that is a concatenated tensor of all the
          feature values.
    """
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size = batch_size),
        tf_transform_output.transformed_metadata.schema
    )
    
    dataset = dataset.map(lambda x: create_target_values(features_dict = x))
    return dataset.repeat()