按顺序迭代一个 dask 包

Iterate sequentially over a dask bag

我需要将非常大的元素 dask.bag 提交到非线程安全存储,即我需要类似

for x in dbag:
    store.add(x)

我无法使用 compute,因为包太大,无法放入内存。 我需要更像 distributed.as_completed 的东西,但它适用于包,而 distributed.as_completed 不适用。

我可能会继续使用普通计算,但会加一把锁

def commit(x, lock=None):
    with lock:
        store.add(x)

b.map(commit, lock=my_lock)

您可以在哪里创建 threading.Lockmultiprocessing.Lock,具体取决于您正在进行的处理类型

如果您想使用 as_completed,您可以将您的包转换为期货并在其上使用 as_completed。

from distributed.client import futures_of, as_completed
b = b.persist()
futures = futures_of(b)

for future in as_completed(futures):
    for x in future.result():
        store.add(x)

你也可以转换成dataframe,我相信这样迭代更明智

df = b.to_dataframe(...)
for x in df.iteritems(...):
    ...