dask.delayed 结果没有加速
dask.delayed results in no speedup
我正在尝试进入 Dask。为此,我试图并行化我得到的一些耗时的顺序代码。原代码是这样的:
def sequential():
sims = []
chunksize = len(tokens)//4
for i in range(0, len(tokens), chunksize):
print(i, i+chunksize)
chunk = tokens[i:i+chunksize]
sims.append(process(chunk))
return sims
%time sequential()
并行代码是这样的:
def parallel():
sims = []
chunksize = len(tokens)//4
for i in range(0, len(tokens), chunksize):
print(i, i+chunksize)
chunk = dask.delayed(tokens[i:i+chunksize])
sims.append(dask.delayed(process)(chunk))
return dask.delayed(sims)
%time parallel().visualize()
但是并行代码总是比并行代码慢 10% 左右。当我可视化 sims
的计算图时,我得到了这个:
不确定 list-#8
的来源,但除此之外它看起来是正确的。那么为什么没有加速呢?当我查看 htop 时,我可以看到 3 个内核处于活动状态(每个大约 30% 负载),而对于顺序代码,我只看到 1 个内核处于活动状态(100% 负载)。顺序代码运行 7 分钟,并行代码运行 7 - 8 分钟。
我想我误解了这里应该如何使用 delayed
和 compute
?
设置是这样的,如果你需要的话:
import numpy
import spacy
import dask
nlp = spacy.load('en_core_web_lg')
tokens = [t for t in nlp(" ".join(t.strip() for t in open('./words.txt','r').readlines())) if len(t.text) > 1 and len(t.text) < 20]
def process(chunk):
sims = numpy.zeros([len(chunk),len(tokens)], dtype=numpy.float32)
for i in range(len(chunk)):
for j in range(len(tokens)):
sims[i,j] = chunk[i].similarity(tokens[j])
return sims
您看到此行为是因为 dask 的默认执行引擎基于单个进程中的多个线程("threaded" 调度程序)。 Python 有一个锁,GIL,它通过一次只执行一个 python 语句来确保解释器的安全。因此,每个线程大部分时间都在等待锁可用。
为避免此问题,您有两种选择:
- 找到一个释放 GIL 的计算版本。如果您可以将其表述为(主要)一些 numpy、pandas、numba 等计算、在 C 级别执行并且不需要解释器的代码,这与您的嵌套循环不同,这是可能的。
- 运行 您的代码使用进程,使用 "mutiprocessing" 调度程序或(更好)"distributed" 调度程序,尽管名称如此,但也 运行 很好一台机器。
更多信息:http://dask.pydata.org/en/latest/scheduler-overview.html
我正在尝试进入 Dask。为此,我试图并行化我得到的一些耗时的顺序代码。原代码是这样的:
def sequential():
sims = []
chunksize = len(tokens)//4
for i in range(0, len(tokens), chunksize):
print(i, i+chunksize)
chunk = tokens[i:i+chunksize]
sims.append(process(chunk))
return sims
%time sequential()
并行代码是这样的:
def parallel():
sims = []
chunksize = len(tokens)//4
for i in range(0, len(tokens), chunksize):
print(i, i+chunksize)
chunk = dask.delayed(tokens[i:i+chunksize])
sims.append(dask.delayed(process)(chunk))
return dask.delayed(sims)
%time parallel().visualize()
但是并行代码总是比并行代码慢 10% 左右。当我可视化 sims
的计算图时,我得到了这个:
不确定 list-#8
的来源,但除此之外它看起来是正确的。那么为什么没有加速呢?当我查看 htop 时,我可以看到 3 个内核处于活动状态(每个大约 30% 负载),而对于顺序代码,我只看到 1 个内核处于活动状态(100% 负载)。顺序代码运行 7 分钟,并行代码运行 7 - 8 分钟。
我想我误解了这里应该如何使用 delayed
和 compute
?
设置是这样的,如果你需要的话:
import numpy
import spacy
import dask
nlp = spacy.load('en_core_web_lg')
tokens = [t for t in nlp(" ".join(t.strip() for t in open('./words.txt','r').readlines())) if len(t.text) > 1 and len(t.text) < 20]
def process(chunk):
sims = numpy.zeros([len(chunk),len(tokens)], dtype=numpy.float32)
for i in range(len(chunk)):
for j in range(len(tokens)):
sims[i,j] = chunk[i].similarity(tokens[j])
return sims
您看到此行为是因为 dask 的默认执行引擎基于单个进程中的多个线程("threaded" 调度程序)。 Python 有一个锁,GIL,它通过一次只执行一个 python 语句来确保解释器的安全。因此,每个线程大部分时间都在等待锁可用。 为避免此问题,您有两种选择:
- 找到一个释放 GIL 的计算版本。如果您可以将其表述为(主要)一些 numpy、pandas、numba 等计算、在 C 级别执行并且不需要解释器的代码,这与您的嵌套循环不同,这是可能的。
- 运行 您的代码使用进程,使用 "mutiprocessing" 调度程序或(更好)"distributed" 调度程序,尽管名称如此,但也 运行 很好一台机器。
更多信息:http://dask.pydata.org/en/latest/scheduler-overview.html