使用 tf.contrib.data.parallel_interleave 并行化 tf.from_generator
Parallelize tf.from_generator using tf.contrib.data.parallel_interleave
我有一堆 JSON 数组文件(准确地说是 AVRO),每个文件都会产生多个样本来训练 Keras 模型。使用来自 and from @jsimsa 的想法,我能够想出这个来并行化我的输入管道。无法弄清楚如何设计 generator(n)
来划分处理文件的工作。代码在 parse_file(f)
内失败,因为该函数需要字符串文件路径而不是 Tensor
、
N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512
def generator(n):
size = math.ceil(len(files_to_process) / N)
start_index = n * size
end_index = start_index + size
def gen():
# for f in files_to_process[start_index:end_index]:
for f in tf.slice(files_to_process, start_index, size):
yield f
return gen
def dataset(n):
return tf.data.Dataset.from_generator(generator(n), (tf.string,))
def process_file(f):
examples_x, examples_y = parse_file(f)
return examples_x, examples_y
ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)
...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
- 这里
generator(n)
的正确设计方法是什么
- 这是使用
parallel_interleave
和 flat_map
设计我的输入管道的优化方法吗
在我看来,生成器让您的生活变得不必要地复杂化了。
这就是我实现输入管道的方式:
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()
为了测试它,我将一个虚拟 parse_file
定义为:
i=0
def parse_file(f):
global i
i += 1
return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
我将其输入到一个基本循环中,该循环显示迭代器 returns:
sess = tf.Session()
try:
while True:
x, y = it.get_next()
vx, vy = sess.run([x,y])
print(vx)
print(vy)
except tf.errors.OutOfRangeError:
pass
sess.close()
运行 上面的代码打印:
[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]
管道说明
基本上,我将并行化问题留给 map
,我可以在其中传递它应该 运行 的线程数。不需要生成器遍历范围和那些额外的复杂性。
我选择 map 而不是 parallel_interleave
,因为后者要求您为它 returns 的每个项目生成一个 Dataset
实例,这在您的情况下并没有真正意义,因为您当您 运行 parse_file
时,已经将所有值加载到内存中。
如果您缓慢生成值(例如,通过将 tf.data.TFRecordDataset
应用于文件名列表),parallel_interleave
是有意义的,但如果您的数据集适合内存,请选择 map
.
关于 tf.py_func
限制,它们不会影响您训练的网络,只会影响输入管道。理想情况下,您将有不同的管道用于训练和网络的最终使用。您只需要注意后者期间的限制,而对于培训(除非您对分布式培训做一些非常具体的事情 and/or 在机器之间移动培训),您是相当安全的。
带生成器的版本
如果您的 JSON 文件非常大并且它们的内容不适合内存,您可以使用生成器,但与您开始使用的方法略有不同。
这个想法是,生成器一次遍历 JSON 文件和 yield
s 一条记录。然后,生成器必须是您的 parse_file
函数。例如,假设您有以下 parse_file
生成器:
i = 3
def parse_file(filename):
global i
i += 1
ctr = 0
while ctr < i:
yield ctr, ctr
在这种情况下,管道将如下所示:
def wrap_generator(filename):
return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()
请注意,这里我们需要使用 parallel_interleave
,因为我们将生成器转换为 Dataset
个实例,我们从中提取值。
其余保持不变。
将其送入与上述打印相同的示例循环:
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
我有一堆 JSON 数组文件(准确地说是 AVRO),每个文件都会产生多个样本来训练 Keras 模型。使用来自 generator(n)
来划分处理文件的工作。代码在 parse_file(f)
内失败,因为该函数需要字符串文件路径而不是 Tensor
、
N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512
def generator(n):
size = math.ceil(len(files_to_process) / N)
start_index = n * size
end_index = start_index + size
def gen():
# for f in files_to_process[start_index:end_index]:
for f in tf.slice(files_to_process, start_index, size):
yield f
return gen
def dataset(n):
return tf.data.Dataset.from_generator(generator(n), (tf.string,))
def process_file(f):
examples_x, examples_y = parse_file(f)
return examples_x, examples_y
ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)
...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
- 这里
generator(n)
的正确设计方法是什么 - 这是使用
parallel_interleave
和flat_map
设计我的输入管道的优化方法吗
在我看来,生成器让您的生活变得不必要地复杂化了。 这就是我实现输入管道的方式:
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()
为了测试它,我将一个虚拟 parse_file
定义为:
i=0
def parse_file(f):
global i
i += 1
return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
我将其输入到一个基本循环中,该循环显示迭代器 returns:
sess = tf.Session()
try:
while True:
x, y = it.get_next()
vx, vy = sess.run([x,y])
print(vx)
print(vy)
except tf.errors.OutOfRangeError:
pass
sess.close()
运行 上面的代码打印:
[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]
管道说明
基本上,我将并行化问题留给 map
,我可以在其中传递它应该 运行 的线程数。不需要生成器遍历范围和那些额外的复杂性。
我选择 map 而不是 parallel_interleave
,因为后者要求您为它 returns 的每个项目生成一个 Dataset
实例,这在您的情况下并没有真正意义,因为您当您 运行 parse_file
时,已经将所有值加载到内存中。
如果您缓慢生成值(例如,通过将 tf.data.TFRecordDataset
应用于文件名列表),parallel_interleave
是有意义的,但如果您的数据集适合内存,请选择 map
.
关于 tf.py_func
限制,它们不会影响您训练的网络,只会影响输入管道。理想情况下,您将有不同的管道用于训练和网络的最终使用。您只需要注意后者期间的限制,而对于培训(除非您对分布式培训做一些非常具体的事情 and/or 在机器之间移动培训),您是相当安全的。
带生成器的版本
如果您的 JSON 文件非常大并且它们的内容不适合内存,您可以使用生成器,但与您开始使用的方法略有不同。
这个想法是,生成器一次遍历 JSON 文件和 yield
s 一条记录。然后,生成器必须是您的 parse_file
函数。例如,假设您有以下 parse_file
生成器:
i = 3
def parse_file(filename):
global i
i += 1
ctr = 0
while ctr < i:
yield ctr, ctr
在这种情况下,管道将如下所示:
def wrap_generator(filename):
return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()
请注意,这里我们需要使用 parallel_interleave
,因为我们将生成器转换为 Dataset
个实例,我们从中提取值。
其余保持不变。
将其送入与上述打印相同的示例循环:
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]