Keras 的 OrderedEnqueuer 中是否保证批次的顺序?
Is the order of batches guaranteed in Keras' OrderedEnqueuer?
我有一个自定义 keras.utils.sequence
,它按特定(和关键)顺序生成批次。
但是,我需要跨多个内核并行化批处理生成。名称“OrderedEnqueuer
”是否意味着结果队列中的批次顺序保证与原始 keras.utils.sequence
的顺序相同?
我认为此订单无法保证的原因:
- OrderedEnqueuer 在内部使用 python
multiprocessing
的 apply_async
。
- Keras 的文档明确表示
OrderedEnqueuer
保证不会重复批次 - 但不保证顺序。
我认为是的原因:
- 姓名!
- 我知道
keras.utils.sequence
对象是可索引的。
- 我在 Keras 的 github 上找到了测试脚本,这些脚本似乎是为了验证顺序而设计的——尽管我找不到任何关于这些是否通过,或者它们是否真正具有决定性的文档。
如果不能保证这里的顺序,我欢迎任何关于如何在保持保证顺序的同时并行化批处理准备的建议,前提是它必须能够并行化任意 python 代码 - 我相信例如 tf.data.Dataset
API 不允许这样做(tf.py_function
回调到原始 python 进程)。
是的,已下单。
通过以下测试自行检查。
首先,让我们创建一个虚拟 Sequence
,returns 只是等待随机时间后的批次索引(随机时间是为了确保批次不会按顺序完成):
import time, random, datetime
import numpy as np
import tensorflow as tf
class DataLoader(tf.keras.utils.Sequence):
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(random.randint(1,2))
#you could add a print here to see that it's out of order
return i
现在让我们创建一个测试函数来创建入队器并使用它。
该函数获取工人数量并打印花费的时间以及返回的结果。
def test(workers):
enq = tf.keras.utils.OrderedEnqueuer(DataLoader())
enq.start(workers = workers)
gen = enq.get()
results = []
start = datetime.datetime.now()
for i in range(30):
results.append(next(gen))
enq.stop()
print('test with', workers, 'workers took', datetime.datetime.now() - start)
print("results:", results)
结果:
test(1)
test(8)
test with 1 workers took 0:00:45.093122
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test with 8 workers took 0:00:09.127771
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
注意:
- 8 个 worker 比 1 个 worker 快得多 -> 并行化没问题
- 两种情况的结果都已排序
我有一个自定义 keras.utils.sequence
,它按特定(和关键)顺序生成批次。
但是,我需要跨多个内核并行化批处理生成。名称“OrderedEnqueuer
”是否意味着结果队列中的批次顺序保证与原始 keras.utils.sequence
的顺序相同?
我认为此订单无法保证的原因:
- OrderedEnqueuer 在内部使用 python
multiprocessing
的apply_async
。 - Keras 的文档明确表示
OrderedEnqueuer
保证不会重复批次 - 但不保证顺序。
我认为是的原因:
- 姓名!
- 我知道
keras.utils.sequence
对象是可索引的。 - 我在 Keras 的 github 上找到了测试脚本,这些脚本似乎是为了验证顺序而设计的——尽管我找不到任何关于这些是否通过,或者它们是否真正具有决定性的文档。
如果不能保证这里的顺序,我欢迎任何关于如何在保持保证顺序的同时并行化批处理准备的建议,前提是它必须能够并行化任意 python 代码 - 我相信例如 tf.data.Dataset
API 不允许这样做(tf.py_function
回调到原始 python 进程)。
是的,已下单。
通过以下测试自行检查。
首先,让我们创建一个虚拟 Sequence
,returns 只是等待随机时间后的批次索引(随机时间是为了确保批次不会按顺序完成):
import time, random, datetime
import numpy as np
import tensorflow as tf
class DataLoader(tf.keras.utils.Sequence):
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(random.randint(1,2))
#you could add a print here to see that it's out of order
return i
现在让我们创建一个测试函数来创建入队器并使用它。 该函数获取工人数量并打印花费的时间以及返回的结果。
def test(workers):
enq = tf.keras.utils.OrderedEnqueuer(DataLoader())
enq.start(workers = workers)
gen = enq.get()
results = []
start = datetime.datetime.now()
for i in range(30):
results.append(next(gen))
enq.stop()
print('test with', workers, 'workers took', datetime.datetime.now() - start)
print("results:", results)
结果:
test(1)
test(8)
test with 1 workers took 0:00:45.093122
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test with 8 workers took 0:00:09.127771
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
注意:
- 8 个 worker 比 1 个 worker 快得多 -> 并行化没问题
- 两种情况的结果都已排序