升级到 tf.dataset 解析 csv 时无法正常工作
Upgrade to tf.dataset not working properly when parsing csv
我有一个 GCMLE 实验,我正在尝试升级我的 input_fn
以使用新的 tf.data
功能。我基于此 sample
创建了以下 input_fn
def input_fn(...):
dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5)
if shuffle:
dataset = dataset.shuffle(buffer_size = 10000)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
features = iterator.get_next()
labels = features.pop(LABEL_COLUMN)
return features, labels
我的parse_csv
和我之前用的是一样的,但是现在不能用了。我可以解决一些问题,但我不完全理解 为什么 我会遇到这些问题。这是我的 parse_csv() 函数
的开始
def parse_csv(..):
columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
raw_features = dict(zip(FIELDNAMES, columns))
words = tf.string_split(raw_features['sentences']) # splitting words
vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
default_value = 0)
....
马上这个 tf.string_split()
停止工作并且错误是 ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], [].
- 这很容易通过 [=21= 将 raw_features['sentences']
打包到张量中来解决] 但我不明白为什么 this dataset
方法需要这样做?为什么在旧版本中可以正常工作?为了使形状与我的模型的其余部分相匹配,我最终需要通过 words = tf.squeeze(words, 0)
在最后删除这个额外的维度,因为我将这个 "unecessary" 维度添加到张量中。
无论出于何种原因,我也收到 table 未初始化的错误 tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized.
但是,此代码完全适用于我的旧 input_fn()
(见下文)所以我不知道为什么我现在需要初始化 tables?我还没有想出这部分的解决方案。在我的 parse_csv 函数中我是否缺少能够使用 tf.contrib.lookup.index_table_from_file
的东西?
作为参考,这是我的旧 input_fn() 仍然有效:
def input_fn(...):
filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames),
num_epochs=num_epochs, shuffle=shuffle, capacity=32)
reader = tf.TextLineReader(skip_header_lines=skip_header_lines)
_, rows = reader.read_up_to(filename_queue, num_records=batch_size)
features = parse_csv(rows, hparams)
if shuffle:
features = tf.train.shuffle_batch(
features,
batch_size,
min_after_dequeue=2 * batch_size + 1,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
else:
features = tf.train.batch(
features,
batch_size,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
labels = features.pop(LABEL_COLUMN)
return features, labels
更新 TF 1.7
我正在使用 TF 1.7(它应该具有@mrry 回答中提到的所有 TF 1.6 功能)重新访问它,但我仍然无法复制该行为。对于我的旧 input_fn()
,我可以得到大约 13 steps/sec。我使用的新功能如下:
def input_fn(...):
files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
parse_csv_dataset(row, hparams = hparams),
batch_size = batch_size,
num_parallel_batches = multiprocessing.cpu_count()))
dataset = dataset.prefetch(1)
if shuffle:
dataset = dataset.shuffle(buffer_size = 10000)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_initializable_iterator()
features = iterator.get_next()
tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
labels = {key: features.pop(key) for key in LABEL_COLUMNS}
return features, labels
我相信我正在关注所有 performance guildines,例如 1) 使用预取 2) 使用 map_and_batch 和 num_parallel_batches = 核心 3) 使用 parallel_interleave 4 ) 在重复之前应用随机播放。我没有使用的唯一步骤是缓存建议,但希望它真的只对第一个以及 "applying interleave, prefetch and shuffle first." 之后的时期有帮助——但是我发现在 [=87= 之后进行预取和随机播放] 大约有 10% 的加速。
缓冲区问题
我注意到的第一个性能问题是我的旧 input_fn()
花了我大约 13 分钟的时间来完成 20k 步,但即使 buffer_size 为 10,000(我的意思是我们正在等到我们处理了 10,000 个批次)我仍在等待 40 多分钟以使缓冲区变满。花这么长时间有意义吗?如果我知道我在 GCS 上的分片 .csv 已经随机化,是否可以接受 table 将这个 shuffle/buffer 大小变小?我正在尝试从 tf.train.shuffle_batch() 复制行为——但是,似乎在最坏的情况下它应该花费与达到 10k 步相同的 13 分钟才能填满缓冲区?
STEPS/SEC
即使缓冲区填满,全局 steps/sec 在与之前 input_fn() 得到 ~13 steps/sec。
马虎交错
我最终尝试用 sloppy_interleave() 替换 parallel_interleave() 因为这是@mrry 的另一个建议。当我切换到 sloppy_interleave 时,我得到了 14 steps/sec!我知道这意味着它不是确定性的,但这真的应该意味着它不是从一个 运行 (或纪元)到下一个的确定性吗?还是对此有更大的影响?我应该关心旧 shuffle_batch()
方法和 sloppy_interleave 之间的任何真正区别吗?这导致 4-5 倍改进的事实是否表明之前的阻塞因子是什么?
当您使用 tf.data.TextLineDataset
, each element is a scalar string. In this respect, it is more similar to using tf.TextLineReader.read()
, rather than the batch version tf.TextLineReader.read_up_to()
, which returns a vector of strings. Unfortunately the tf.string_split()
op 时需要矢量输入(尽管将来可能会更改),因此目前需要形状操作。
Lookup table 与 tf.data
中的函数的交互略有不同。直觉是你应该在Dataset.map()
调用外声明一次查找table(这样它将被初始化一次)然后在[=]中捕获它17=] 函数调用 vocab_table.lookup()
。像下面这样的东西应该可以工作:
def input_fn(...):
dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
# Define `vocab_table` outside the map function and use it in `parse_csv()`.
vocab_table = tf.contrib.lookup.index_table_from_file(
vocabulary_file=hparams.vocab_file, default_value=0)
def parse_csv(...):
columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
raw_features = dict(zip(FIELDNAMES, columns))
words = tf.string_split([raw_features['sentences']]) # splitting words
# Use the captured `vocab_table` here.
word_indices = vocab_table.lookup(words)
# ...
features = ...
# NOTE: Structure the output here so that you can simply return
# the dataset from `input_fn()`.
labels = features.pop(LABEL_COLUMN)
return features, labels
# NOTE: Consider using `tf.contrib.data.parallel_interleave()` to perform
# the reads in parallel.
dataset = dataset.interleave(
lambda filename: (tf.data.TextLineDataset(filename)
.skip(1)
.map(lambda row: parse_csv(row, hparams),
num_parallel_calls=multiprocessing.cpu_count())),
cycle_length=5)
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
# NOTE: Add prefetching here to run the input pipeline in the background.
dataset = dataset.prefetch(1)
# NOTE: This requires TensorFlow 1.5 or later, but this change simplifies the
# initialization of the lookup table.
return dataset
在 TF 1.4(目前是与 GCMLE 一起使用的最新版本的 TF)中,您将无法将 make_one_shot_iterator()
与查找表一起使用(参见相关的 ) you will need to use Dataset.make_initializable_iterator()
and then initialize iterator.initalizer
with your default TABLES_INITIALIZER
(from this )。 input_fn()
应该是这样的:
def input_fn(...):
dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
# Define `vocab_table` outside the map function and use it in `parse_csv()`.
vocab_table = tf.contrib.lookup.index_table_from_file(
vocabulary_file=hparams.vocab_file, default_value=0)
dataset = dataset.interleave(
lambda filename: (tf.data.TextLineDataset(filename)
.skip(1)
.map(lambda row: parse_csv(row, hparams),
num_parallel_calls=multiprocessing.cpu_count())),
cycle_length=5)
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_initializable_iterator()
features = iterator.get_next()
# add iterator.intializer to be handled by default table initializers
tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
labels = features.pop(LABEL_COLUMN)
return features, labels
我有一个 GCMLE 实验,我正在尝试升级我的 input_fn
以使用新的 tf.data
功能。我基于此 sample
def input_fn(...):
dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5)
if shuffle:
dataset = dataset.shuffle(buffer_size = 10000)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
features = iterator.get_next()
labels = features.pop(LABEL_COLUMN)
return features, labels
我的parse_csv
和我之前用的是一样的,但是现在不能用了。我可以解决一些问题,但我不完全理解 为什么 我会遇到这些问题。这是我的 parse_csv() 函数
def parse_csv(..):
columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
raw_features = dict(zip(FIELDNAMES, columns))
words = tf.string_split(raw_features['sentences']) # splitting words
vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
default_value = 0)
....
马上这个
tf.string_split()
停止工作并且错误是ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], [].
- 这很容易通过 [=21= 将raw_features['sentences']
打包到张量中来解决] 但我不明白为什么 thisdataset
方法需要这样做?为什么在旧版本中可以正常工作?为了使形状与我的模型的其余部分相匹配,我最终需要通过words = tf.squeeze(words, 0)
在最后删除这个额外的维度,因为我将这个 "unecessary" 维度添加到张量中。无论出于何种原因,我也收到 table 未初始化的错误
tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized.
但是,此代码完全适用于我的旧input_fn()
(见下文)所以我不知道为什么我现在需要初始化 tables?我还没有想出这部分的解决方案。在我的 parse_csv 函数中我是否缺少能够使用tf.contrib.lookup.index_table_from_file
的东西?
作为参考,这是我的旧 input_fn() 仍然有效:
def input_fn(...):
filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames),
num_epochs=num_epochs, shuffle=shuffle, capacity=32)
reader = tf.TextLineReader(skip_header_lines=skip_header_lines)
_, rows = reader.read_up_to(filename_queue, num_records=batch_size)
features = parse_csv(rows, hparams)
if shuffle:
features = tf.train.shuffle_batch(
features,
batch_size,
min_after_dequeue=2 * batch_size + 1,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
else:
features = tf.train.batch(
features,
batch_size,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
labels = features.pop(LABEL_COLUMN)
return features, labels
更新 TF 1.7
我正在使用 TF 1.7(它应该具有@mrry 回答中提到的所有 TF 1.6 功能)重新访问它,但我仍然无法复制该行为。对于我的旧 input_fn()
,我可以得到大约 13 steps/sec。我使用的新功能如下:
def input_fn(...):
files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
parse_csv_dataset(row, hparams = hparams),
batch_size = batch_size,
num_parallel_batches = multiprocessing.cpu_count()))
dataset = dataset.prefetch(1)
if shuffle:
dataset = dataset.shuffle(buffer_size = 10000)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_initializable_iterator()
features = iterator.get_next()
tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
labels = {key: features.pop(key) for key in LABEL_COLUMNS}
return features, labels
我相信我正在关注所有 performance guildines,例如 1) 使用预取 2) 使用 map_and_batch 和 num_parallel_batches = 核心 3) 使用 parallel_interleave 4 ) 在重复之前应用随机播放。我没有使用的唯一步骤是缓存建议,但希望它真的只对第一个以及 "applying interleave, prefetch and shuffle first." 之后的时期有帮助——但是我发现在 [=87= 之后进行预取和随机播放] 大约有 10% 的加速。
缓冲区问题
我注意到的第一个性能问题是我的旧 input_fn()
花了我大约 13 分钟的时间来完成 20k 步,但即使 buffer_size 为 10,000(我的意思是我们正在等到我们处理了 10,000 个批次)我仍在等待 40 多分钟以使缓冲区变满。花这么长时间有意义吗?如果我知道我在 GCS 上的分片 .csv 已经随机化,是否可以接受 table 将这个 shuffle/buffer 大小变小?我正在尝试从 tf.train.shuffle_batch() 复制行为——但是,似乎在最坏的情况下它应该花费与达到 10k 步相同的 13 分钟才能填满缓冲区?
STEPS/SEC
即使缓冲区填满,全局 steps/sec 在与之前 input_fn() 得到 ~13 steps/sec。
马虎交错
我最终尝试用 sloppy_interleave() 替换 parallel_interleave() 因为这是@mrry 的另一个建议。当我切换到 sloppy_interleave 时,我得到了 14 steps/sec!我知道这意味着它不是确定性的,但这真的应该意味着它不是从一个 运行 (或纪元)到下一个的确定性吗?还是对此有更大的影响?我应该关心旧 shuffle_batch()
方法和 sloppy_interleave 之间的任何真正区别吗?这导致 4-5 倍改进的事实是否表明之前的阻塞因子是什么?
当您使用
tf.data.TextLineDataset
, each element is a scalar string. In this respect, it is more similar to usingtf.TextLineReader.read()
, rather than the batch versiontf.TextLineReader.read_up_to()
, which returns a vector of strings. Unfortunately thetf.string_split()
op 时需要矢量输入(尽管将来可能会更改),因此目前需要形状操作。Lookup table 与
tf.data
中的函数的交互略有不同。直觉是你应该在Dataset.map()
调用外声明一次查找table(这样它将被初始化一次)然后在[=]中捕获它17=] 函数调用vocab_table.lookup()
。像下面这样的东西应该可以工作:def input_fn(...): dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # Define `vocab_table` outside the map function and use it in `parse_csv()`. vocab_table = tf.contrib.lookup.index_table_from_file( vocabulary_file=hparams.vocab_file, default_value=0) def parse_csv(...): columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS) raw_features = dict(zip(FIELDNAMES, columns)) words = tf.string_split([raw_features['sentences']]) # splitting words # Use the captured `vocab_table` here. word_indices = vocab_table.lookup(words) # ... features = ... # NOTE: Structure the output here so that you can simply return # the dataset from `input_fn()`. labels = features.pop(LABEL_COLUMN) return features, labels # NOTE: Consider using `tf.contrib.data.parallel_interleave()` to perform # the reads in parallel. dataset = dataset.interleave( lambda filename: (tf.data.TextLineDataset(filename) .skip(1) .map(lambda row: parse_csv(row, hparams), num_parallel_calls=multiprocessing.cpu_count())), cycle_length=5) if shuffle: dataset = dataset.shuffle(buffer_size=10000) dataset = dataset.repeat(num_epochs) dataset = dataset.batch(batch_size) # NOTE: Add prefetching here to run the input pipeline in the background. dataset = dataset.prefetch(1) # NOTE: This requires TensorFlow 1.5 or later, but this change simplifies the # initialization of the lookup table. return dataset
在 TF 1.4(目前是与 GCMLE 一起使用的最新版本的 TF)中,您将无法将 make_one_shot_iterator()
与查找表一起使用(参见相关的 Dataset.make_initializable_iterator()
and then initialize iterator.initalizer
with your default TABLES_INITIALIZER
(from this input_fn()
应该是这样的:
def input_fn(...):
dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
# Define `vocab_table` outside the map function and use it in `parse_csv()`.
vocab_table = tf.contrib.lookup.index_table_from_file(
vocabulary_file=hparams.vocab_file, default_value=0)
dataset = dataset.interleave(
lambda filename: (tf.data.TextLineDataset(filename)
.skip(1)
.map(lambda row: parse_csv(row, hparams),
num_parallel_calls=multiprocessing.cpu_count())),
cycle_length=5)
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_initializable_iterator()
features = iterator.get_next()
# add iterator.intializer to be handled by default table initializers
tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
labels = features.pop(LABEL_COLUMN)
return features, labels