dask.delayed 结果没有加速

dask.delayed results in no speedup

我正在尝试进入 Dask。为此,我试图并行化我得到的一些耗时的顺序代码。原代码是这样的:

def sequential():
    sims = [] 
    chunksize = len(tokens)//4
    for i in range(0, len(tokens), chunksize):
        print(i, i+chunksize)
        chunk = tokens[i:i+chunksize]
        sims.append(process(chunk))     
    return sims

%time sequential()

并行代码是这样的:

def parallel():
    sims = []
    chunksize = len(tokens)//4
    for i in range(0, len(tokens), chunksize):
        print(i, i+chunksize)
        chunk = dask.delayed(tokens[i:i+chunksize])
        sims.append(dask.delayed(process)(chunk))
    return dask.delayed(sims)

%time parallel().visualize()

但是并行代码总是比并行代码慢 10% 左右。当我可视化 sims 的计算图时,我得到了这个:

不确定 list-#8 的来源,但除此之外它看起来是正确的。那么为什么没有加速呢?当我查看 htop 时,我可以看到 3 个内核处于活动状态(每个大约 30% 负载),而对于顺序代码,我只看到 1 个内核处于活动状态(100% 负载)。顺序代码运行 7 分钟,并行代码运行 7 - 8 分钟。

我想我误解了这里应该如何使用 delayedcompute


设置是这样的,如果你需要的话:

import numpy
import spacy
import dask

nlp = spacy.load('en_core_web_lg')

tokens = [t for t in nlp(" ".join(t.strip() for t in open('./words.txt','r').readlines())) if len(t.text) > 1 and len(t.text) < 20]


def process(chunk):
    sims = numpy.zeros([len(chunk),len(tokens)], dtype=numpy.float32)
    for i in range(len(chunk)):
        for j in range(len(tokens)):
            sims[i,j] = chunk[i].similarity(tokens[j])
    return sims 

您看到此行为是因为 dask 的默认执行引擎基于单个进程中的多个线程("threaded" 调度程序)。 Python 有一个锁,GIL,它通过一次只执行一个 python 语句来确保解释器的安全。因此,每个线程大部分时间都在等待锁可用。 为避免此问题,您有两种选择:

  • 找到一个释放 GIL 的计算版本。如果您可以将其表述为(主要)一些 numpy、pandas、numba 等计算、在 C 级别执行并且不需要解释器的代码,这与您的嵌套循环不同,这是可能的。
  • 运行 您的代码使用进程,使用 "mutiprocessing" 调度程序或(更好)"distributed" 调度程序,尽管名称如此,但也 运行 很好一台机器。

更多信息:http://dask.pydata.org/en/latest/scheduler-overview.html