如何缓存和迭代未知大小的数据集?
How to cache and iterate through a Dataset of unknown size?
在将 .cache()
步骤添加到我的数据集管道时,连续的训练时期仍会从网络存储中下载数据。
我在网络存储上有一个数据集。我想缓存它,但不要重复它:训练时期必须 运行 遍历整个数据集。
这是我的数据集构建管道:
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=32
)
如果我按原样使用,数据集会在每个时期下载。为避免这种情况,我必须将 .repeat()
步骤添加到管道并使用 model.fit
函数的 steps_per_epoch
关键字。但是,我不知道完整数据集的大小,因此无法传递正确的 steps_per_epoch
值。
缓存和使用未知大小的数据集的正确方法是什么?
谢谢。
编辑
在阅读一些 TF 代码时,我(重新)发现了 make_initializable_iterator
。看来这正是我要找的,也就是说通过同一个数据集迭代多次(在第一次迭代后利用缓存)。但是,这已被弃用,不再是 TF2.
中主要 API 的一部分
更新指令是用for ... in dataset
手动迭代数据集。不就是keras.Model.fit
函数做的吗?我必须手动编写训练循环以获得缓存优势吗?
亲切
在TF2.0中,您不需要.repeat()
。通过
successives training epochs still download the data from the network storage.
我认为您对消息 filling up shuffle buffer
感到困惑。如果您使用 shuffle()
函数,这会在每个纪元之前发生。也许不使用 shuffle()
试试看有什么不同。
另外,我建议您在 map()
之后和 batch()
.
之前使用 cache()
编辑
filling up shuffle buffer
是您在使用 shuffle
函数时收到的消息。使用 cache()
后,您仍然可以 shuffle()
数据集。看here
另外,如果我理解正确的话,你正在将 map()
的结果数据集提供给你的模型进行训练,那么你应该 cache()
这个数据集而不是另一个数据集,因为训练将以此为基础进行。
要计算数据集中的元素数量,您可以使用以下代码
num_elements = 0
for element in dataset: # tf.dataset type
num_elements += 1
print ('Total number of elements in the file: ',num_elements)
现在,通过将 num_elements
与您的 batch_size
相结合,您将获得 steps_per_epoch
好消息!最终 v2.0.0 版本修复了此行为。
下面是突出显示不同行为的代码片段。
import time
import tensorflow as tf
import tensorflow.keras as keras
# Simple layer that just print its inputs
class Print(keras.layers.Layer):
def compute_output_signature(self, input_signature):
return input_signature
def call(self, inputs, **kwargs):
tf.print(inputs)
return inputs
# Generator returning incremented values each time it is re-initialized
generator_list = [0]
def generator():
v = generator_list[-1]
generator_list.append(v+1)
tf.print("Generating samples with value {}".format(v))
time.sleep(2)
for i in range(2):
yield (tf.constant([v]), tf.constant(v))
def main():
model_input = keras.layers.Input(shape=(1,))
model_output = Print()(model_input)
model = keras.Model(inputs=model_input, outputs=model_output)
model.compile("adam", loss="mae")
ds = tf.data.Dataset.from_generator(
generator, (tf.int64, tf.int64), ([1], [])
)
cached_ds = ds.cache()
tf.print("Fit")
model.fit(
cached_ds,
epochs=3,
verbose=2
)
tf.print("For ... in ...")
for i in range(3):
for x, y in cached_ds:
model(x)
if __name__ == '__main__':
main()
使用 tensorflow 2.0.0-b1(在 Google AI 平台上使用),输出如下:
Fit
Epoch 1/3
Generating samples with value 0
# sleep 2s
2019-10-03 15:45:32.718522: W tensorflow/compiler/jit/mark_for_compilation_pass.cc:1483] (One-time warning): Not using XLA:CPU for cluster because envvar TF_XLA_FLAGS=--tf_xla_cpu_global_jit was not set. If you want XLA:CPU, either set that envvar, or use experimental_jit_scope to enable XLA:CPU. To confirm that XLA is active, pass --vmodule=xla_compilation_cache=1 (as a proper command-line flag, not via TF_XLA_FLAGS) or set the envvar XLA_FLAGS=--xla_hlo_profile.
[[0]]
[[0]]
2/2 - 2s - loss: 0.0000e+00
Generating samples with value 1
# sleep 2s
Epoch 2/3
[[1]]
[[1]]
2/2 - 2s - loss: 0.0000e+00
Epoch 3/3
2019-10-03 15:45:34.774195: W tensorflow/core/kernels/data/cache_dataset_ops.cc:815] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
Generating samples with value 2
# sleep 2s
[[2]]
[[2]]
2019-10-03 15:45:36.782046: W tensorflow/core/kernels/data/cache_dataset_ops.cc:815] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
2/2 - 2s - loss: 0.0000e+00
For ... in ...
Generating samples with value 3
# sleep 2s
[3]
[3]
Generating samples with value 4
# sleep 2s
[4]
[4]
Generating samples with value 5
# sleep 2s
[5]
[5]
可以看到,张量的值在每个epoch递增,每次都执行sleep指令。此外,我们收到有关截断迭代器的警告...
现在,使用 tensorflow 2.0.0:
Fit
Epoch 1/3
WARNING:tensorflow:The list of trainable weights is empty. Make sure that you are not setting model.trainable to False before compiling the model.
Generating samples with value 0
# sleep 2s
[[0]]
[[0]]
2019-10-03 15:49:59.587796: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
2/2 - 2s - loss: 0.0000e+00
Epoch 2/3
[[0]]
[[0]]
2019-10-03 15:49:59.598144: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
2/2 - 0s - loss: 0.0000e+00
Epoch 3/3
[[0]]
[[0]]
2019-10-03 15:49:59.605260: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
For ... in ...
2/2 - 0s - loss: 0.0000e+00
[0]
[0]
[0]
[0]
[0]
[0]
还有'Voila'!生成器函数只执行一次,不再休眠,张量的值始终相同。我只是有一些关于序列结束的警告,但我可以支持它!
亲切
在将 .cache()
步骤添加到我的数据集管道时,连续的训练时期仍会从网络存储中下载数据。
我在网络存储上有一个数据集。我想缓存它,但不要重复它:训练时期必须 运行 遍历整个数据集。 这是我的数据集构建管道:
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=32
)
如果我按原样使用,数据集会在每个时期下载。为避免这种情况,我必须将 .repeat()
步骤添加到管道并使用 model.fit
函数的 steps_per_epoch
关键字。但是,我不知道完整数据集的大小,因此无法传递正确的 steps_per_epoch
值。
缓存和使用未知大小的数据集的正确方法是什么?
谢谢。
编辑
在阅读一些 TF 代码时,我(重新)发现了 make_initializable_iterator
。看来这正是我要找的,也就是说通过同一个数据集迭代多次(在第一次迭代后利用缓存)。但是,这已被弃用,不再是 TF2.
更新指令是用for ... in dataset
手动迭代数据集。不就是keras.Model.fit
函数做的吗?我必须手动编写训练循环以获得缓存优势吗?
亲切
在TF2.0中,您不需要.repeat()
。通过
successives training epochs still download the data from the network storage.
我认为您对消息 filling up shuffle buffer
感到困惑。如果您使用 shuffle()
函数,这会在每个纪元之前发生。也许不使用 shuffle()
试试看有什么不同。
另外,我建议您在 map()
之后和 batch()
.
cache()
编辑
filling up shuffle buffer
是您在使用 shuffle
函数时收到的消息。使用 cache()
后,您仍然可以 shuffle()
数据集。看here
另外,如果我理解正确的话,你正在将 map()
的结果数据集提供给你的模型进行训练,那么你应该 cache()
这个数据集而不是另一个数据集,因为训练将以此为基础进行。
要计算数据集中的元素数量,您可以使用以下代码
num_elements = 0
for element in dataset: # tf.dataset type
num_elements += 1
print ('Total number of elements in the file: ',num_elements)
现在,通过将 num_elements
与您的 batch_size
相结合,您将获得 steps_per_epoch
好消息!最终 v2.0.0 版本修复了此行为。
下面是突出显示不同行为的代码片段。
import time
import tensorflow as tf
import tensorflow.keras as keras
# Simple layer that just print its inputs
class Print(keras.layers.Layer):
def compute_output_signature(self, input_signature):
return input_signature
def call(self, inputs, **kwargs):
tf.print(inputs)
return inputs
# Generator returning incremented values each time it is re-initialized
generator_list = [0]
def generator():
v = generator_list[-1]
generator_list.append(v+1)
tf.print("Generating samples with value {}".format(v))
time.sleep(2)
for i in range(2):
yield (tf.constant([v]), tf.constant(v))
def main():
model_input = keras.layers.Input(shape=(1,))
model_output = Print()(model_input)
model = keras.Model(inputs=model_input, outputs=model_output)
model.compile("adam", loss="mae")
ds = tf.data.Dataset.from_generator(
generator, (tf.int64, tf.int64), ([1], [])
)
cached_ds = ds.cache()
tf.print("Fit")
model.fit(
cached_ds,
epochs=3,
verbose=2
)
tf.print("For ... in ...")
for i in range(3):
for x, y in cached_ds:
model(x)
if __name__ == '__main__':
main()
使用 tensorflow 2.0.0-b1(在 Google AI 平台上使用),输出如下:
Fit
Epoch 1/3
Generating samples with value 0
# sleep 2s
2019-10-03 15:45:32.718522: W tensorflow/compiler/jit/mark_for_compilation_pass.cc:1483] (One-time warning): Not using XLA:CPU for cluster because envvar TF_XLA_FLAGS=--tf_xla_cpu_global_jit was not set. If you want XLA:CPU, either set that envvar, or use experimental_jit_scope to enable XLA:CPU. To confirm that XLA is active, pass --vmodule=xla_compilation_cache=1 (as a proper command-line flag, not via TF_XLA_FLAGS) or set the envvar XLA_FLAGS=--xla_hlo_profile.
[[0]]
[[0]]
2/2 - 2s - loss: 0.0000e+00
Generating samples with value 1
# sleep 2s
Epoch 2/3
[[1]]
[[1]]
2/2 - 2s - loss: 0.0000e+00
Epoch 3/3
2019-10-03 15:45:34.774195: W tensorflow/core/kernels/data/cache_dataset_ops.cc:815] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
Generating samples with value 2
# sleep 2s
[[2]]
[[2]]
2019-10-03 15:45:36.782046: W tensorflow/core/kernels/data/cache_dataset_ops.cc:815] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
2/2 - 2s - loss: 0.0000e+00
For ... in ...
Generating samples with value 3
# sleep 2s
[3]
[3]
Generating samples with value 4
# sleep 2s
[4]
[4]
Generating samples with value 5
# sleep 2s
[5]
[5]
可以看到,张量的值在每个epoch递增,每次都执行sleep指令。此外,我们收到有关截断迭代器的警告...
现在,使用 tensorflow 2.0.0:
Fit
Epoch 1/3
WARNING:tensorflow:The list of trainable weights is empty. Make sure that you are not setting model.trainable to False before compiling the model.
Generating samples with value 0
# sleep 2s
[[0]]
[[0]]
2019-10-03 15:49:59.587796: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
2/2 - 2s - loss: 0.0000e+00
Epoch 2/3
[[0]]
[[0]]
2019-10-03 15:49:59.598144: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
2/2 - 0s - loss: 0.0000e+00
Epoch 3/3
[[0]]
[[0]]
2019-10-03 15:49:59.605260: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Out of range: End of sequence
[[{{node IteratorGetNext}}]]
For ... in ...
2/2 - 0s - loss: 0.0000e+00
[0]
[0]
[0]
[0]
[0]
[0]
还有'Voila'!生成器函数只执行一次,不再休眠,张量的值始终相同。我只是有一些关于序列结束的警告,但我可以支持它!
亲切