tf.Data:什么是并行交织中的掉队者?
tf.Data: what are stragglers in parallel interleaving?
interleave
is a tf.Data.Dataset
method that can be used to interleave together elements from multiple datasets. tf.contrib.data.parallel_interleave
provides a parallel version of the same functionality with the help of apply
.
我可以看到,并行读取多个数据集并为它们设置缓冲区(并行版本允许)将提高吞吐量。但是 documentation 也有关于 parallel_interleave
如何增加数据吞吐量的说法:
Unlike tf.data.Dataset.interleave, it gets elements from cycle_length
nested datasets in parallel, which increases the throughput,
especially in the presence of stragglers.
散乱者到底是什么,为什么 parallel_interleave
在吞吐量方面表现得特别好?
散乱者是一种需要比正常时间更长的时间来产生输出的函数。这可能是由于网络拥塞或随机性的奇怪组合。
interleave
在单个线程上以顺序方式执行所有处理。在下面的schema中,让___
表示等待IO/Computation,<waiting>
表示等待轮到它吐出一个元素 和 111
表示 产生第一个元素 (1
).
假设我们有一个目录 ds = [A, B, C, D]
的数据集,我们从每个目录中生成文件 1,2,3...
。然后使用 r = ds.interleave(cycle_length=3, block_length=2)
会像这样工作:
A: ___111___222
B: <waiting> ___111___________222
C: <waiting> <waiting> <waiting> ___111___222
R: ____A1____A2____B1____________B2____C1____C2
您看到如果从 B 零散地生成元素,则所有后续元素都必须等待处理。
parallel_interleave
以两种方式帮助掉队者。首先,它启动循环中的每个元素 并行 (因此得名)。因此,生产模式变为:
A: ___111___222
B: ___<waiting>111___________222
C: ___<waiting><waiting><waitin>111___222
R: ____A1____A2_B1____________B2_C1____C2|....|
这样做有助于通过并行等待减少无用的等待。 |....|
部分显示了与顺序版本相比我们节省了多少。
第二种帮助方式是允许 sloppy
参数。如果我们将它设置为 True
,它允许跳过一个不可用的元素直到它可用,代价是产生一个不确定的顺序。方法如下:
A: ___111___<w>222
B: ___<w>111___________222
C: ___<w><w>111___222
R: ____A1_B1_C1_A2_C2___B2|...................|
看看节省的钱!!还要看元素的顺序!
我在代码中重现了这些。这是一种丑陋的方式,但它稍微说明了差异。
from time import sleep
DS = tf.data.Dataset
def repeater(val):
def _slow_gen():
for i in range(5):
if i % 2:
sleep(1)
yield i
return DS.from_generator(_slow_gen, tf.int8)
ds = DS.range(5)
slow_ds = ds.interleave(repeater, cycle_length=2, block_length=3)
para_ds = ds.apply(tf.contrib.data.parallel_interleave(
repeater, cycle_length=2, block_length=3)
)
sloppy_ds = ds.apply(tf.contrib.data.parallel_interleave(
repeater, cycle_length=2, block_length=3, sloppy=True)
)
%time apply_python_func(slow_ds, print, sess)
# 10 sec, you see it waiting each time
%time apply_python_func(para_ds, print, sess)
# 3 sec always! you see it burping a lot after the first wait
%time apply_python_func(sloppy_ds, print, sess)
# sometimes 3, sometimes 4 seconds
这里是显示数据集的函数
def apply_python_func(ds, func, sess):
"""Exact values from ds using sess and apply func on them"""
it = ds.make_one_shot_iterator()
next_value = it.get_next()
num_examples = 0
while True:
try:
value = sess.run(next_value)
num_examples += 1
func(value)
except tf.errors.OutOfRangeError:
break
print('Evaluated {} examples'.format(num_examples))
interleave
is a tf.Data.Dataset
method that can be used to interleave together elements from multiple datasets. tf.contrib.data.parallel_interleave
provides a parallel version of the same functionality with the help of apply
.
我可以看到,并行读取多个数据集并为它们设置缓冲区(并行版本允许)将提高吞吐量。但是 documentation 也有关于 parallel_interleave
如何增加数据吞吐量的说法:
Unlike tf.data.Dataset.interleave, it gets elements from cycle_length nested datasets in parallel, which increases the throughput, especially in the presence of stragglers.
散乱者到底是什么,为什么 parallel_interleave
在吞吐量方面表现得特别好?
散乱者是一种需要比正常时间更长的时间来产生输出的函数。这可能是由于网络拥塞或随机性的奇怪组合。
interleave
在单个线程上以顺序方式执行所有处理。在下面的schema中,让___
表示等待IO/Computation,<waiting>
表示等待轮到它吐出一个元素 和 111
表示 产生第一个元素 (1
).
假设我们有一个目录 ds = [A, B, C, D]
的数据集,我们从每个目录中生成文件 1,2,3...
。然后使用 r = ds.interleave(cycle_length=3, block_length=2)
会像这样工作:
A: ___111___222
B: <waiting> ___111___________222
C: <waiting> <waiting> <waiting> ___111___222
R: ____A1____A2____B1____________B2____C1____C2
您看到如果从 B 零散地生成元素,则所有后续元素都必须等待处理。
parallel_interleave
以两种方式帮助掉队者。首先,它启动循环中的每个元素 并行 (因此得名)。因此,生产模式变为:
A: ___111___222
B: ___<waiting>111___________222
C: ___<waiting><waiting><waitin>111___222
R: ____A1____A2_B1____________B2_C1____C2|....|
这样做有助于通过并行等待减少无用的等待。 |....|
部分显示了与顺序版本相比我们节省了多少。
第二种帮助方式是允许 sloppy
参数。如果我们将它设置为 True
,它允许跳过一个不可用的元素直到它可用,代价是产生一个不确定的顺序。方法如下:
A: ___111___<w>222
B: ___<w>111___________222
C: ___<w><w>111___222
R: ____A1_B1_C1_A2_C2___B2|...................|
看看节省的钱!!还要看元素的顺序!
我在代码中重现了这些。这是一种丑陋的方式,但它稍微说明了差异。
from time import sleep
DS = tf.data.Dataset
def repeater(val):
def _slow_gen():
for i in range(5):
if i % 2:
sleep(1)
yield i
return DS.from_generator(_slow_gen, tf.int8)
ds = DS.range(5)
slow_ds = ds.interleave(repeater, cycle_length=2, block_length=3)
para_ds = ds.apply(tf.contrib.data.parallel_interleave(
repeater, cycle_length=2, block_length=3)
)
sloppy_ds = ds.apply(tf.contrib.data.parallel_interleave(
repeater, cycle_length=2, block_length=3, sloppy=True)
)
%time apply_python_func(slow_ds, print, sess)
# 10 sec, you see it waiting each time
%time apply_python_func(para_ds, print, sess)
# 3 sec always! you see it burping a lot after the first wait
%time apply_python_func(sloppy_ds, print, sess)
# sometimes 3, sometimes 4 seconds
这里是显示数据集的函数
def apply_python_func(ds, func, sess):
"""Exact values from ds using sess and apply func on them"""
it = ds.make_one_shot_iterator()
next_value = it.get_next()
num_examples = 0
while True:
try:
value = sess.run(next_value)
num_examples += 1
func(value)
except tf.errors.OutOfRangeError:
break
print('Evaluated {} examples'.format(num_examples))