在函数中调用多处理池非常慢

Calling multiprocessing pool within a function is very slow

我正在尝试使用 pathos 来触发函数内的多处理。但是,我注意到一个奇怪的行为,但不知道为什么:

import spacy
from pathos.multiprocessing import ProcessPool as Pool


nlp = spacy.load("es_core_news_sm")

def preworker(text, nlp):
    return [w.lemma_ for w in nlp(text)]

worker = lambda text: preworker(text, nlp)

texts = ["Este es un texto muy interesante en español"] * 10

# Run this in jupyter:
%%time

pool = Pool(3)
r = pool.map(worker, texts)

输出为

CPU times: user 6.6 ms, sys: 26.5 ms, total: 33.1 ms
Wall time: 141 ms

到目前为止一切顺利...现在我定义了完全相同的计算,但是来自一个函数:

def out_worker(texts, nlp):
    worker = lambda text: preworker(text, nlp)
    pool = Pool(3)
    return pool.map(worker, texts)

# Run this in jupyter:
%%time 

r = out_worker(texts, nlp)

现在的输出是

CPU times: user 10.2 s, sys: 591 ms, total: 10.8 s
Wall time: 13.4 s

为什么会有这么大的差异?我的假设是,尽管我不知道为什么,在第二种情况下,nlp 对象的副本被发送到每个单一工作。

另外,我怎样才能从一个函数中正确调用这个多处理?

谢谢


编辑:

为了问题的重现性,这里有一个 Python 显示情况的脚本:

import spacy
from pathos.multiprocessing import ProcessPool as Pool
import time

# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")

def preworker(text, nlp):
    return [w.lemma_ for w in nlp(text)]

worker = lambda text: preworker(text, nlp)

texts = ["Este es un texto muy interesante en español"] * 10

st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
print(f"Usual pool took {time.time()-st:.3f} seconds")

def out_worker(texts, nlp):
    worker = lambda text: preworker(text, nlp)
    pool = Pool(3)
    return pool.map(worker, texts)

st = time.time()
r = out_worker(texts, nlp)
print(f"Pool within a function took {time.time()-st:.3f} seconds")

def out_worker2(texts, nlp, pool):     
    worker = lambda text: preworker(text, nlp)     
    return pool.map(worker, texts)

st = time.time()
pool = Pool(3) 
r = out_worker2(texts, nlp, pool)
print(f"Pool passed to a function took {time.time()-st:.3f} seconds")

在我的例子中,输出是这样的:

Usual pool took 0.219 seconds
Pool within a function took 8.164 seconds
Pool passed to a function took 8.265 seconds

spacy nlp 对象相当重(几 MB)。我的 spacy 版本是 3.0.3

而不是 from pathos.multiprocessing import ProcessPool as Pool, 我用过 from multiprocess import Pool,本质上是一样的。然后我尝试了一些替代方法。

所以:

from multiprocess import Pool

“通常”情况的结果为 0.1s,其他两种情况的结果为 12.5s

但是:

from multiprocess import Pool
import dill 
dill.settings['recurse'] = True

所有三种情况的产量 12.5s

最后:

from multiprocess.dummy import Pool

所有三种情况的产量 0.1s

这告诉我,这肯定是一个序列化问题,globals的序列化才是速度的关键。

在第一种情况下,默认的 dill 行为是尽可能避免通过 globals 进行递归。它能够以“通常”的方式成功完成此操作,但不能用于函数内的其他两个调用。

当我第一次导入 dill 并将全局变量的行为切换为 recurse 时(这就是 cloudpickle 进行酸洗的方式),然后它在所有三次尝试中都很慢(“包括通常的”方式。

最后,如果我使用 multiprocess.dummy,因此 ThreadPool -- 它不需要序列化全局变量,您可以看到它在所有情况下都很快。

结论:如果可行,请使用 pathos.pools.ThreadPoolmultiprocess.dummy.Pool。否则,请确保您 运行 没有序列化全局变量。

dill 中有一个有用的工具,您可以使用它来查看正在序列化的内容。如果你包含 dill.detect.trace(True),那么 dill 会为它正在序列化的对象吐出一堆代码,因为它会递归地 pickle 对象及其依赖项。您必须查看 dill 源代码以匹配键(例如 F1 是一种特定类型的函数对象,而 D1 是一种特定类型的字典)。您可以看到不同的方法如何序列化不同的底层对象。不幸的是我没有分析器,所以你不能立即看到速度在哪里,但你可以看到它采取的不同策略。

我只是尽量避免序列化 nlp 对象,或任何导致速度变慢的对象(可能是 nlp 对象)。

例如,您可以这样代替在函数中传递 nlp 对象:

import spacy
from multiprocess import Pool
import time

# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")

def preworker(text, nlp):
    return [w.lemma_ for w in nlp(text)]

worker = lambda text: preworker(text, nlp)

texts = ["Este es un texto muy interesante en espanol"] * 10

st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
pool.close(); pool.join()
print("Usual pool took {0:.3f} seconds".format(time.time()-st))

def out_worker(texts):
    worker = lambda text: preworker(text, nlp)
    pool = Pool(3)
    res = pool.map(worker, texts)
    pool.close(); pool.join()
    return res

st = time.time()
r = out_worker(texts)
print("Pool within a function took {0:.3f} seconds".format(time.time()-st))

通过引用查找传递 nlp 而不是显式地通过函数参数,两种情况下的速度都是 0.1s