Dask 能否自动创建一棵树来并行计算并减少 worker 之间的副本?
Can Dask automatically create a tree to parallelize a computation and reduce the copies between workers?
我将其分为两个部分,背景和问题。问题一直在底部。
背景:
假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会非常快,但让我们在这个例子中继续使用 Dask。
完成此操作(使用延迟)的基本方法是:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(matrices):
return reduce(lambda a, b: a + b, matrices)
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# Here's the Big Sum
matrices = calc_sum(matrices)
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
这是动态图:
这肯定会奏效,但是随着矩阵的大小(见上文 gen_matrix)变得太大,Dask 分布式工作者开始遇到三个问题:
- 他们向执行求和的主要工作人员发送数据超时
- 主要工作人员在收集所有矩阵时内存不足
- 总和不是运行并行(只有matrix ganeration是)
请注意,其中 none 这些问题是 Dask 的错,它按照宣传的方式工作。我刚刚设置的计算很糟糕。
一个解决方案是将其分解为树计算,如下所示,以及该图的粗略可视化:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(a, b):
return a + b
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# This tells us the depth of the calculation portion
# of the tree we are constructing in the next step
depth = int(math.log(num_matrices, 2))
# This is the code I don't want to have to manually write
for _ in range(depth):
matrices = [
calc_sum(matrices[i], matrices[i+1])
for i in range(0, len(matrices), 2)
]
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
图表:
问题:
我希望能够通过库或 Dask 本身完成这棵树的生成。我怎样才能做到这一点?
对于那些想知道的人,为什么不直接使用上面的代码呢?因为有些极端情况我不想编写代码,也因为它只是需要编写更多代码:)
我也看到了这个:
functools 或 itertools 中是否有知道如何执行此操作的东西(并且可以与 dask.delayed 一起使用)?
Dask 包有一个 reduction/aggregation 方法可以生成树状 DAG:fold。
工作流程是 'bag' 延迟对象然后折叠它们。
我将其分为两个部分,背景和问题。问题一直在底部。
背景:
假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会非常快,但让我们在这个例子中继续使用 Dask。
完成此操作(使用延迟)的基本方法是:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(matrices):
return reduce(lambda a, b: a + b, matrices)
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# Here's the Big Sum
matrices = calc_sum(matrices)
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
这是动态图:
这肯定会奏效,但是随着矩阵的大小(见上文 gen_matrix)变得太大,Dask 分布式工作者开始遇到三个问题:
- 他们向执行求和的主要工作人员发送数据超时
- 主要工作人员在收集所有矩阵时内存不足
- 总和不是运行并行(只有matrix ganeration是)
请注意,其中 none 这些问题是 Dask 的错,它按照宣传的方式工作。我刚刚设置的计算很糟糕。
一个解决方案是将其分解为树计算,如下所示,以及该图的粗略可视化:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(a, b):
return a + b
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# This tells us the depth of the calculation portion
# of the tree we are constructing in the next step
depth = int(math.log(num_matrices, 2))
# This is the code I don't want to have to manually write
for _ in range(depth):
matrices = [
calc_sum(matrices[i], matrices[i+1])
for i in range(0, len(matrices), 2)
]
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
图表:
问题:
我希望能够通过库或 Dask 本身完成这棵树的生成。我怎样才能做到这一点?
对于那些想知道的人,为什么不直接使用上面的代码呢?因为有些极端情况我不想编写代码,也因为它只是需要编写更多代码:)
我也看到了这个:
functools 或 itertools 中是否有知道如何执行此操作的东西(并且可以与 dask.delayed 一起使用)?
Dask 包有一个 reduction/aggregation 方法可以生成树状 DAG:fold。
工作流程是 'bag' 延迟对象然后折叠它们。