tensorflow 2 使用 keras.sequence 作为数据生成器来训练具有多处理错误的机器学习模型

tensorflow 2 use keras.sequence as data generator for training machine learning model with multiprocessing error

我想做一个关于在 EC2 实例上仅使用 jupyter notebook 的 CPU 训练机器学习模型的测试。

代码为tensorflow 2.8。 基于 tf 文档,https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit

use_multiprocessing Boolean. 
Used for generator or keras.utils.Sequence input only. 
If True, use process-based threading. If unspecified, use_multiprocessing will 
default to False. Note that because this implementation relies on multiprocessing, 
you should not pass non-picklable arguments to the generator as they can't be passed 
easily to children processes.

我正在尝试为 model.fit() 启用“use_multiprocessing”。

import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.utils import Sequence

feature_format = {
                         "a": tf.io.FixedLenFeature((1,), dtype=tf.int8),
                         "b": tf.io.FixedLenFeature((1,), dtype=tf.int8),
                 }

  label_format = {
                 "label": tf.io.FixedLenFeature((1,), dtype=tf.int64, default_value=(0,))
                 }

class DataGenerator(Sequence):
    def __init__(self):
        features = { 'a': tf.constant([1,2,3]),
                     'b': tf.constant([-1,-2,-3]),
                     'labels': tf.constant([1,1,0])
                }
        feature_dataset = tf.data.Dataset.from_tensors(features)
     
        tf.print(f"type{feature_dataset}")
        for x in feature_dataset:
            print(x['a'])

    def _process(input_dataset):
        features = tf.io.parse_single_example(input_dataset, feature_format)
        labels = tf.io.parse_single_example(input_dataset, label_format)
        return (features, labels)
    
    feature_dataset = feature_dataset.shuffle(1)
    feature_dataset = feature_dataset.map(_process)
    feature_dataset = feature_dataset.cache()
    feature_dataset = feature_dataset.repeat(2)
    feature_dataset = feature_dataset.batch(1)
    self.feature_dataset = feature_dataset.prefetch(1)
    self.data_size = 3
    self.batch_size = 1
    
def __len__(self):
    return np.math.ceil(self.data_size/self.batch_size)

def __getitem__(self):
    return self.feature_dataset


class MyModel(keras.Model):

    def __init__(self):
        super().__init__()
        self.embedding_layer1 = keras.layers.embedding(10, 32)
        self.embedding_layer2 = keras.layers.embedding(10, 32)

    # the input data needs to be accessed by column name
    # so, the dataset is created with dictionary.
    def call(self, input_dataset, training=True):
        f1_data, f2_data = {}, {}
        for col in ['a']:
            f1_data[col] = input_dataset(col)
        for col in ['b']:
            f2_data[col] = input_dataset(col)
    
        f1_out = self.embedding_layer1(f1_data)
        f2_out = self.embedding_layer2(f2_data)
        result = tf.reduce_sum(tf.multiply(f1_out, f2_out))
        return result

model = MyModel()
dg = DataGenerator()
model.fit_generator(dg,
      epochs=2,
      workers=psutil.cpu_count(),
      use_multiprocessing=True
    )

TypeError: in user code:

features = tf.io.parse_single_example(input_dataset, feature_format)

TypeError: Expected any non-tensor type, but got a tensor instead.

我如何制作一个数据集才能被 parse_single_example() 接受? 目前,数据集是一个 TensorDataset,每个张量都可以通过一个键访问。 谢谢

更新

我试过以下答案。但是,出错了。

import psutil
import numpy as np
class MyModel(keras.Model):

   def __init__(self):
        super().__init__()
        self.embedding_layer1 = keras.layers.Embedding(10, 32)
        self.embedding_layer2 = keras.layers.Embedding(10, 32)

    # the input data needs to be accessed by feature name
    def call(self, input_dataset, training=True):
        f1_data, f2_data = {}, {}
        for col in ['a']:
            f1_data[col] = input_dataset[col] # error !
        for col in ['b']:
            f2_data[col] = input_dataset[col]

        f1_out = self.embedding_layer1(f1_data)
        f2_out = self.embedding_layer2(f2_data)
        result = tf.reduce_sum(tf.multiply(f1_out, f2_out))
        return result

model = MyModel()
dg = DataGenerator()

model.compile(optimizer=tf.optimizers.Adam(learning_rate=0.001),
          loss = tf.keras.losses.BinaryCrossentropy(),
          run_eagerly=False,
          metrics=[keras.metrics.BinaryAccuracy()]
         )

 model.fit_generator(dg,
                    epochs=2,
                    workers=psutil.cpu_count(),
                    use_multiprocessing=True
                    )

 TypeError: Exception encountered when calling layer "my_model_7" (type MyModel).

'PrefetchDataset' object is not subscriptable

Call arguments received:
  • input_dataset=<PrefetchDataset element_spec=({'a': 
  TensorSpec(shape=(None, 1), dtype=tf.int64, name=None), 'b': 
  TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)}, 
  {'label': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)})>
  • training=False

如果你想解析/反序列化一个tf.train.Example,你必须先通过创建一个protobuf字符串来序列化它。否则就没有意义。这是一个例子:

import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.utils import Sequence

feature_format = {"a": tf.io.FixedLenFeature((1,), dtype=tf.int64),"b": tf.io.FixedLenFeature((1,), dtype=tf.int64)}

label_format = { "label": tf.io.FixedLenFeature((1,), dtype=tf.int64, default_value=(0,))}

def _int64_feature(value):
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _process(input_dataset):
    features = tf.io.parse_single_example(input_dataset, feature_format)
    labels = tf.io.parse_single_example(input_dataset, label_format)
    return features, labels

def serialize_example(a, b, labels):
  feature = {
      'a': _int64_feature(a.numpy()),
      'b': _int64_feature(b.numpy()),
      'labels': _int64_feature(labels.numpy()),
  }
  example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
  return example_proto.SerializeToString()

class DataGenerator(Sequence):
    def __init__(self):

        feature_dataset = tf.data.Dataset.from_tensor_slices(([1,2,3], [-1,-2,-3], [1,1,0]))
        feature_dataset = feature_dataset.shuffle(1)
        feature_dataset = feature_dataset.map(lambda a, b, l: tf.py_function(func=serialize_example, inp=[a, b, l], Tout=tf.string))
        feature_dataset = feature_dataset.map(_process)
        feature_dataset = feature_dataset.cache()
        feature_dataset = feature_dataset.repeat(2)
        feature_dataset = feature_dataset.batch(1)
        self.feature_dataset = feature_dataset.prefetch(1)
        self.data_size = 3
        self.batch_size = 1
        tf.print(f"type{feature_dataset}")
        for data, labels in feature_dataset:
            print(data['a'])
    
    def __len__(self):
        return np.math.ceil(self.data_size/self.batch_size)

    def __getitem__(self):
        return self.feature_dataset
        
dg = DataGenerator()
type<BatchDataset element_spec=({'a': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None), 'b': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)}, {'label': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)})>
tf.Tensor([[1]], shape=(1, 1), dtype=int64)
tf.Tensor([[2]], shape=(1, 1), dtype=int64)
tf.Tensor([[3]], shape=(1, 1), dtype=int64)
tf.Tensor([[1]], shape=(1, 1), dtype=int64)
tf.Tensor([[2]], shape=(1, 1), dtype=int64)
tf.Tensor([[3]], shape=(1, 1), dtype=int64)