为什么 Dask 的执行速度如此之慢,而多进程的执行速度如此之快?

Why does Dask perform so slower while multiprocessing perform so much faster?

为了更好地理解并行,我比较了一组不同的代码。

这是最基本的 (code_piece_1)。

for循环

import time

# setup
problem_size = 1e7
items = range(9)

# serial
def counter(num=0):
    junk = 0
    for i in range(int(problem_size)):
        junk += 1
        junk -= 1
    return num

def sum_list(args):
    print("sum_list fn:", args)
    return sum(args)

start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed)
print('for loop {}s'.format(time.time() - start))

此代码运行 串行样式(for 循环)的时间使用者并得到此结果

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
for loop 8.7735116481781s

多处理

可以将多处理风格视为一种实现并行计算的方式吗?

我假设是,因为 doc 这么说。

这里是code_piece_2

import multiprocessing
start = time.time()
pool = multiprocessing.Pool(len(items))
num_to_sum = pool.map(counter, items)
print(sum_list(num_to_sum))
print('pool.map {}s'.format(time.time() - start))

此代码 运行 多处理风格的同一时间消费者并得到此结果

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
pool.map 1.6011056900024414s

显然,在这种特殊情况下,多处理比串行更快。

达斯克

Dask 是一个灵活的库,用于 Python.

中的并行计算

此代码(code_piece_3)运行与Dask同时使用(我不确定我是否正确使用Dask。)

@delayed
def counter(num=0):
    junk = 0
    for i in range(int(problem_size)):
        junk += 1
        junk -= 1
    return num
@delayed
def sum_list(args):
    print("sum_list fn:", args)
    return sum(args)

start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed.compute())
print('dask delayed {}s'.format(time.time() - start))

我得到了

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
dask delayed 10.288054704666138s

我的cpu有6个物理核心

问题

为什么 Dask 的执行速度如此之慢,而多进程的执行速度如此之快?

我是不是用错了Dask?如果是,正确的方法是什么?

注意:请结合本案例或其他具体案例进行讨论。请不要泛泛而谈。

Q : Why did parallel computing take longer than a serial one?

因为 有更多指令加载到 CPU 上以供执行("awfully" 许多甚至在指令/预期块的第一步之前计算首先进入 CPU ),然后在纯 [SERIAL] 的情况下,没有附加成本被添加到执行流程中。

对于这些(从源代码中隐藏的)附加操作(您在 [TIME]-域(此类 "preparations" 的持续时间)和 [SPACE]- 中支付域(分配更多 RAM 以包含 [PARALLEL] 操作代码所需的所有相关结构(好吧,如果我们在术语上迂腐且准确的话,通常仍然是 [CONCURRENT] 操作的代码),这又是花费你 [TIME],因为每个 RAM-I/O 花费你超过 [us] ~ 300~380 [ns] 的 1/3 )

结果?

除非您的工作负载包有 "sufficient enough" 工作量,可以并行执行(非阻塞、无锁、无互斥、无共享, 没有依赖性, 没有 I/O, ...确实是独立的, 有最小的 RAM-I/O 重新获取), 很容易 "pay way more than you ever get back".

有关附加成本和对最终加速有如此强烈影响的因素的详细信息,请开始阅读 the criticism of blind using the original, overhead naive formulation of the Amdahl's law here

您拥有的代码需要 GIL,因此一次只有一个任务 运行,您得到的只是额外的开销。例如,如果您使用带有进程的分布式调度程序,那么您将获得更好的性能。

在你的例子中,dask 比 python 多处理慢,因为你没有指定调度程序,所以 dask 使用多线程后端,这是默认的。正如 mdurant 所指出的,您的代码没有释放 GIL,因此多线程无法并行执行任务图。

在此处查看有关该主题的良好概述:https://docs.dask.org/en/stable/scheduler-overview.html

对于您的代码,您可以通过调用以下命令切换到多处理后端: .compute(scheduler='processes')

如果使用多处理后端,所有进程间的通信仍然需要通过主进程。因此,您可能还想查看分布式调度程序,工作进程可以在其中直接相互通信,这对于复杂的任务图尤其有用。此外,分布式调度程序支持工作窃取以平衡进程之间的工作,并有一个 Web 界面提供一些关于 运行 任务的诊断信息。即使您只想在本地计算机上计算,使用分布式调度程序而不是多进程调度程序通常是有意义的。