从生成器创建一个 dask 包

Creating a dask bag from a generator

我想从生成器列表中创建一个 dask.Bag(或 dask.Array)。陷阱是生成器(在评估时)对于内存来说太大了。

delayed_array = [delayed(generator) for generator in list_of_generators]
my_bag = db.from_delayed(delayed_array)

NB list_of_generators 正是这样——发电机还没有被消耗(还)。

我的问题是,在创建 delayed_array 时,生成器被消耗并且 RAM 耗尽。有没有一种方法可以将这些长列表放入 Bag 而无需首先使用它们,或者至少以块的形式使用它们,从而将 RAM 使用率保持在较低水平?

NNB 我 可以 将生成器写入磁盘,然后将文件加载到 Bag - 但我认为我可以使用 dask 解决这个问题?

Dask.bag 的一个不错的子集可以使用大型迭代器。您的解决方案几乎是完美的,但您需要提供一个在调用时创建生成器的函数,而不是生成器本身。

In [1]: import dask.bag as db

In [2]: import dask

In [3]: b = db.from_delayed([dask.delayed(range)(i) for i in [100000000] * 5])

In [4]: b
Out[4]: dask.bag<bag-fro..., npartitions=5>

In [5]: b.take(5)
Out[5]: (0, 1, 2, 3, 4)

In [6]: b.sum()
Out[6]: <dask.bag.core.Item at 0x7f852d8737b8>

In [7]: b.sum().compute()
Out[7]: 24999999750000000

但是,这肯定会影响您。一些稍微复杂的 dask 包操作确实需要使分区具体化,这可能会耗尽 RAM。