如何按顺序聚合 dask Bag 的内容?
How to sequentially aggregate the content of a dask Bag?
我想使用非关联的聚合函数顺序聚合分区集合的内容,因此我不能使用 Bag.fold
或 Bag.reduction
。
似乎有 Bag.accumulate
执行此操作,但它 returns 包含一些每个分区的中间结果而不仅仅是最终聚合的包:
>>> import dask.bag as db
>>>
>>> def collect(acc, e):
... if acc is None:
... acc = list()
... acc.append(e)
... return acc
...
>>> b = db.from_sequence(range(10), npartitions=3)
>>> b.accumulate(collect, initial=None).compute()
[None,
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
基本上我只对 accumulate
输出的最后一个元素感兴趣,我不想在内存中保留中间步骤的副本。
Bag 目前没有顺序缩减操作,但它可以。今天完成此操作的一种简单方法是像上面那样使用 use accumulate
,但只要求最后一个分区的最后一个元素。我们可以通过使用 Bag.to_delayed
将包转换为延迟值来相对容易地做到这一点
acc = b.accumulate(collect, initial=None)
partitions = acc.to_delayed()
partitions[-1][-1].compute()
我想使用非关联的聚合函数顺序聚合分区集合的内容,因此我不能使用 Bag.fold
或 Bag.reduction
。
似乎有 Bag.accumulate
执行此操作,但它 returns 包含一些每个分区的中间结果而不仅仅是最终聚合的包:
>>> import dask.bag as db
>>>
>>> def collect(acc, e):
... if acc is None:
... acc = list()
... acc.append(e)
... return acc
...
>>> b = db.from_sequence(range(10), npartitions=3)
>>> b.accumulate(collect, initial=None).compute()
[None,
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
基本上我只对 accumulate
输出的最后一个元素感兴趣,我不想在内存中保留中间步骤的副本。
Bag 目前没有顺序缩减操作,但它可以。今天完成此操作的一种简单方法是像上面那样使用 use accumulate
,但只要求最后一个分区的最后一个元素。我们可以通过使用 Bag.to_delayed
acc = b.accumulate(collect, initial=None)
partitions = acc.to_delayed()
partitions[-1][-1].compute()