Dask 能否自动创建一棵树来并行计算并减少 worker 之间的副本?

Can Dask automatically create a tree to parallelize a computation and reduce the copies between workers?

我将其分为两个部分,背景和问题。问题一直在底部。

背景:

假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会非常快,但让我们在这个例子中继续使用 Dask。

完成此操作(使用延迟)的基本方法是:

from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np

@delayed
def gen_matrix():
    return np.random.rand(1000, 1000)

@delayed
def calc_sum(matrices):
    return reduce(lambda a, b: a + b, matrices)

if __name__ == '__main__':

    num_matrices = 16

    # Plop them into a big list
    matrices = [gen_matrix() for _ in range(num_matrices)]

    # Here's the Big Sum
    matrices = calc_sum(matrices)

    # Go!
    with dd.Client('localhost:8786') as client:
        f = client.submit(compute, matrices)
        result = client.gather(f)

这是动态图:

这肯定会奏效,但是随着矩阵的大小(见上文 gen_matrix)变得太大,Dask 分布式工作者开始遇到三个问题:

  1. 他们向执行求和的主要工作人员发送数据超时
  2. 主要工作人员在收集所有矩阵时内存不足
  3. 总和不是运行并行(只有matrix ganeration是)

请注意,其中 none 这些问题是 Dask 的错,它按照宣传的方式工作。我刚刚设置的计算很糟糕。

一个解决方案是将其分解为树计算,如下所示,以及该图的粗略可视化:

from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np

@delayed
def gen_matrix():
    return np.random.rand(1000, 1000)

@delayed
def calc_sum(a, b):
    return a + b

if __name__ == '__main__':

    num_matrices = 16

    # Plop them into a big list
    matrices = [gen_matrix() for _ in range(num_matrices)]

    # This tells us the depth of the calculation portion
    # of the tree we are constructing in the next step
    depth = int(math.log(num_matrices, 2))

    # This is the code I don't want to have to manually write
    for _ in range(depth):
        matrices = [
            calc_sum(matrices[i], matrices[i+1])
            for i in range(0, len(matrices), 2)
        ]

    # Go!
    with dd.Client('localhost:8786') as client:
        f = client.submit(compute, matrices)
        result = client.gather(f)

图表:

问题:

我希望能够通过库或 Dask 本身完成这棵树的生成。我怎样才能做到这一点?

对于那些想知道的人,为什么不直接使用上面的代码呢?因为有些极端情况我不想编写代码,也因为它只是需要编写更多代码:)

我也看到了这个:

functools 或 itertools 中是否有知道如何执行此操作的东西(并且可以与 dask.delayed 一起使用)?

Dask 包有一个 reduction/aggregation 方法可以生成树状 DAG:fold

工作流程是 'bag' 延迟对象然后折叠它们。