如何在 Estimator 训练期间动态加载数据集的新部分?
How to load new parts of Dataset dynamically during training of an Estimator?
我有一个有趣的问题。
我正在使用 tf.Estimator
对大型数据集(1500 万行,16 列)进行回归,我使用常用方法将数据加载到 tf.Dataset
:
def input_fn_train(features, labels, batch_size, repeat_count):
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
return dataset
features
和 labels
是 pandas 数据帧。 input_fn
适用于较小的数据(最多几百万行)但是当包括整个数据集时,它会提高:
[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.
这个错误的原因是当调用 .from_tensor_slices()
显式数据(而不是占位符)时,TensorFlow 为每个数据点创建 tf.constant()
。 TensorFlow中图的大小有先天的限制,我的数据太大了。
在 tensorflow 文档中,他们提到了这一点,还提到了解决这个问题的方法:
"As an alternative, you can define the Dataset in terms of tf.placeholder()
tensors, and feed the NumPy arrays when you initialize an Iterator over the dataset."
这种方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它。当 运行 初始化数据集迭代器的操作时,无法将实际值提供给占位符。
使用以下钩子在 tf.Estimator
内部初始化数据集:
class _DatasetInitializerHook(training.SessionRunHook):
def __init__(self, iterator):
self._iterator = iterator
def begin(self):
self._initializer = self._iterator.initializer
def after_create_session(self, session, coord):
del coord
session.run(self._initializer)
如您所见,它在创建会话后立即被调用。问题在于初始化会话 运行 独立于所有挂钩,因此不会在初始化会话 运行 上调用任何挂钩,因此无法将 feed_dict
传递给填充占位符。
我无法自己初始化迭代器,因为无法将迭代器传递给 Estimator
。迭代器在
之后初始化
解决此问题的一种方法是将我的数据显式分离到 TFRecord
文件中,然后使用 TensorFlow 函数直接加载它们,但是,这是一个非常不受欢迎的解决方案。在我公司的代码库中,我们有自己优化的二进制数据格式,使用额外的文件会占用大量 space 和 IO 事务时间,这很关键。
我认为我的问题有多种解决方案,但是,我仍然没有想出任何解决方案。如果您有任何想法或建议,如何做到这一点,请分享,谢谢!
好的,我找到了解决问题的方法。可以使用 Dataset.from_generator()
函数来完成。我的解决方案使用一个生成器生成数据帧,第二个生成器在迭代这些数据帧时生成行。
a = arange(20).reshape(10,2)
df = DataFrame(a, columns=['x1','y1'])
def gen_partition():
for i in range(2):
df_partition = df.iloc[i * 5 : (i + 1) * 5]
yield df_partition
def gen_fields():
for partition in gen_partition(): # type: DataFrame
for row in partition.itertuples():
yield {'x1': row[1]}, row[2]
def input_fn_gen():
dataset = Dataset.from_generator(
gen_fields,
({'x1': tf.float32}, tf.float32),
({'x1': tf.TensorShape([])}, tf.TensorShape([])))
dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
return dataset
feature_columns = [tf.feature_column.numeric_column('x1')]
dir = get_model_dir('linreg_test')
tf.logging.set_verbosity('INFO')
estimator = tf.estimator.LinearRegressor(
feature_columns=feature_columns,
model_dir=dir,
label_dimension=1
)
estimator.train(input_fn=lambda: input_fn_gen())
我有一个有趣的问题。
我正在使用 tf.Estimator
对大型数据集(1500 万行,16 列)进行回归,我使用常用方法将数据加载到 tf.Dataset
:
def input_fn_train(features, labels, batch_size, repeat_count):
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
return dataset
features
和 labels
是 pandas 数据帧。 input_fn
适用于较小的数据(最多几百万行)但是当包括整个数据集时,它会提高:
[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.
这个错误的原因是当调用 .from_tensor_slices()
显式数据(而不是占位符)时,TensorFlow 为每个数据点创建 tf.constant()
。 TensorFlow中图的大小有先天的限制,我的数据太大了。
在 tensorflow 文档中,他们提到了这一点,还提到了解决这个问题的方法:
"As an alternative, you can define the Dataset in terms of tf.placeholder()
tensors, and feed the NumPy arrays when you initialize an Iterator over the dataset."
这种方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它。当 运行 初始化数据集迭代器的操作时,无法将实际值提供给占位符。
使用以下钩子在 tf.Estimator
内部初始化数据集:
class _DatasetInitializerHook(training.SessionRunHook):
def __init__(self, iterator):
self._iterator = iterator
def begin(self):
self._initializer = self._iterator.initializer
def after_create_session(self, session, coord):
del coord
session.run(self._initializer)
如您所见,它在创建会话后立即被调用。问题在于初始化会话 运行 独立于所有挂钩,因此不会在初始化会话 运行 上调用任何挂钩,因此无法将 feed_dict
传递给填充占位符。
我无法自己初始化迭代器,因为无法将迭代器传递给 Estimator
。迭代器在
解决此问题的一种方法是将我的数据显式分离到 TFRecord
文件中,然后使用 TensorFlow 函数直接加载它们,但是,这是一个非常不受欢迎的解决方案。在我公司的代码库中,我们有自己优化的二进制数据格式,使用额外的文件会占用大量 space 和 IO 事务时间,这很关键。
我认为我的问题有多种解决方案,但是,我仍然没有想出任何解决方案。如果您有任何想法或建议,如何做到这一点,请分享,谢谢!
好的,我找到了解决问题的方法。可以使用 Dataset.from_generator()
函数来完成。我的解决方案使用一个生成器生成数据帧,第二个生成器在迭代这些数据帧时生成行。
a = arange(20).reshape(10,2)
df = DataFrame(a, columns=['x1','y1'])
def gen_partition():
for i in range(2):
df_partition = df.iloc[i * 5 : (i + 1) * 5]
yield df_partition
def gen_fields():
for partition in gen_partition(): # type: DataFrame
for row in partition.itertuples():
yield {'x1': row[1]}, row[2]
def input_fn_gen():
dataset = Dataset.from_generator(
gen_fields,
({'x1': tf.float32}, tf.float32),
({'x1': tf.TensorShape([])}, tf.TensorShape([])))
dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
return dataset
feature_columns = [tf.feature_column.numeric_column('x1')]
dir = get_model_dir('linreg_test')
tf.logging.set_verbosity('INFO')
estimator = tf.estimator.LinearRegressor(
feature_columns=feature_columns,
model_dir=dir,
label_dimension=1
)
estimator.train(input_fn=lambda: input_fn_gen())