如何在 Estimator 训练期间动态加载数据集的新部分?

How to load new parts of Dataset dynamically during training of an Estimator?

我有一个有趣的问题。

我正在使用 tf.Estimator 对大型数据集(1500 万行,16 列)进行回归,我使用常用​​方法将数据加载到 tf.Dataset:

def input_fn_train(features, labels, batch_size, repeat_count):

    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
    dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
    return dataset

featureslabels 是 pandas 数据帧。 input_fn 适用于较小的数据(最多几百万行)但是当包括整个数据集时,它会提高:

[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.

这个错误的原因是当调用 .from_tensor_slices() 显式数据(而不是占位符)时,TensorFlow 为每个数据点创建 tf.constant()。 TensorFlow中图的大小有先天的限制,我的数据太大了。

在 tensorflow 文档中,他们提到了这一点,还提到了解决这个问题的方法:

"As an alternative, you can define the Dataset in terms of tf.placeholder() tensors, and feed the NumPy arrays when you initialize an Iterator over the dataset."

这种方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它。当 运行 初始化数据集迭代器的操作时,无法将实际值提供给占位符。

使用以下钩子在 tf.Estimator 内部初始化数据集:

class _DatasetInitializerHook(training.SessionRunHook):

    def __init__(self, iterator):
        self._iterator = iterator

    def begin(self):
        self._initializer = self._iterator.initializer

    def after_create_session(self, session, coord):
        del coord
        session.run(self._initializer)

如您所见,它在创建会话后立即被调用。问题在于初始化会话 运行 独立于所有挂钩,因此不会在初始化会话 运行 上调用任何挂钩,因此无法将 feed_dict 传递给填充占位符。

我无法自己初始化迭代器,因为无法将迭代器传递给 Estimator。迭代器在

之后初始化

解决此问题的一种方法是将我的数据显式分离到 TFRecord 文件中,然后使用 TensorFlow 函数直接加载它们,但是,这是一个非常不受欢迎的解决方案。在我公司的代码库中,我们有自己优化的二进制数据格式,使用额外的文件会占用大量 space 和 IO 事务时间,这很关键。

我认为我的问题有多种解决方案,但是,我仍然没有想出任何解决方案。如果您有任何想法或建议,如何做到这一点,请分享,谢谢!

好的,我找到了解决问题的方法。可以使用 Dataset.from_generator() 函数来完成。我的解决方案使用一个生成器生成数据帧,第二个生成器在迭代这些数据帧时生成行。

a = arange(20).reshape(10,2)
df = DataFrame(a, columns=['x1','y1'])


def gen_partition():
    for i in range(2):
        df_partition = df.iloc[i * 5 : (i + 1) * 5]
        yield df_partition


def gen_fields():
    for partition in gen_partition(): # type: DataFrame
        for row in partition.itertuples():
            yield {'x1': row[1]}, row[2]


def input_fn_gen():
    dataset = Dataset.from_generator(
        gen_fields,
        ({'x1': tf.float32}, tf.float32),
        ({'x1': tf.TensorShape([])}, tf.TensorShape([])))

    dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
    return dataset


feature_columns = [tf.feature_column.numeric_column('x1')]

dir = get_model_dir('linreg_test')

tf.logging.set_verbosity('INFO')

estimator = tf.estimator.LinearRegressor(
    feature_columns=feature_columns,
    model_dir=dir,
    label_dimension=1
)

estimator.train(input_fn=lambda: input_fn_gen())