在函数中调用多处理池非常慢
Calling multiprocessing pool within a function is very slow
我正在尝试使用 pathos 来触发函数内的多处理。但是,我注意到一个奇怪的行为,但不知道为什么:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
# Run this in jupyter:
%%time
pool = Pool(3)
r = pool.map(worker, texts)
输出为
CPU times: user 6.6 ms, sys: 26.5 ms, total: 33.1 ms
Wall time: 141 ms
到目前为止一切顺利...现在我定义了完全相同的计算,但是来自一个函数:
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
# Run this in jupyter:
%%time
r = out_worker(texts, nlp)
现在的输出是
CPU times: user 10.2 s, sys: 591 ms, total: 10.8 s
Wall time: 13.4 s
为什么会有这么大的差异?我的假设是,尽管我不知道为什么,在第二种情况下,nlp 对象的副本被发送到每个单一工作。
另外,我怎样才能从一个函数中正确调用这个多处理?
谢谢
编辑:
为了问题的重现性,这里有一个 Python 显示情况的脚本:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
print(f"Usual pool took {time.time()-st:.3f} seconds")
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
st = time.time()
r = out_worker(texts, nlp)
print(f"Pool within a function took {time.time()-st:.3f} seconds")
def out_worker2(texts, nlp, pool):
worker = lambda text: preworker(text, nlp)
return pool.map(worker, texts)
st = time.time()
pool = Pool(3)
r = out_worker2(texts, nlp, pool)
print(f"Pool passed to a function took {time.time()-st:.3f} seconds")
在我的例子中,输出是这样的:
Usual pool took 0.219 seconds
Pool within a function took 8.164 seconds
Pool passed to a function took 8.265 seconds
spacy nlp 对象相当重(几 MB)。我的 spacy 版本是 3.0.3
而不是
from pathos.multiprocessing import ProcessPool as Pool
, 我用过
from multiprocess import Pool
,本质上是一样的。然后我尝试了一些替代方法。
所以:
from multiprocess import Pool
“通常”情况的结果为 0.1s
,其他两种情况的结果为 12.5s
。
但是:
from multiprocess import Pool
import dill
dill.settings['recurse'] = True
所有三种情况的产量 12.5s
。
最后:
from multiprocess.dummy import Pool
所有三种情况的产量 0.1s
。
这告诉我,这肯定是一个序列化问题,globals
的序列化才是速度的关键。
在第一种情况下,默认的 dill
行为是尽可能避免通过 globals
进行递归。它能够以“通常”的方式成功完成此操作,但不能用于函数内的其他两个调用。
当我第一次导入 dill
并将全局变量的行为切换为 recurse
时(这就是 cloudpickle
进行酸洗的方式),然后它在所有三次尝试中都很慢(“包括通常的”方式。
最后,如果我使用 multiprocess.dummy
,因此 ThreadPool
-- 它不需要序列化全局变量,您可以看到它在所有情况下都很快。
结论:如果可行,请使用 pathos.pools.ThreadPool
或 multiprocess.dummy.Pool
。否则,请确保您 运行 没有序列化全局变量。
dill
中有一个有用的工具,您可以使用它来查看正在序列化的内容。如果你包含 dill.detect.trace(True)
,那么 dill 会为它正在序列化的对象吐出一堆代码,因为它会递归地 pickle 对象及其依赖项。您必须查看 dill
源代码以匹配键(例如 F1
是一种特定类型的函数对象,而 D1
是一种特定类型的字典)。您可以看到不同的方法如何序列化不同的底层对象。不幸的是我没有分析器,所以你不能立即看到速度在哪里,但你可以看到它采取的不同策略。
我只是尽量避免序列化 nlp
对象,或任何导致速度变慢的对象(可能是 nlp
对象)。
例如,您可以这样代替在函数中传递 nlp 对象:
import spacy
from multiprocess import Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en espanol"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
pool.close(); pool.join()
print("Usual pool took {0:.3f} seconds".format(time.time()-st))
def out_worker(texts):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
res = pool.map(worker, texts)
pool.close(); pool.join()
return res
st = time.time()
r = out_worker(texts)
print("Pool within a function took {0:.3f} seconds".format(time.time()-st))
通过引用查找传递 nlp
而不是显式地通过函数参数,两种情况下的速度都是 0.1s
。
我正在尝试使用 pathos 来触发函数内的多处理。但是,我注意到一个奇怪的行为,但不知道为什么:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
# Run this in jupyter:
%%time
pool = Pool(3)
r = pool.map(worker, texts)
输出为
CPU times: user 6.6 ms, sys: 26.5 ms, total: 33.1 ms
Wall time: 141 ms
到目前为止一切顺利...现在我定义了完全相同的计算,但是来自一个函数:
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
# Run this in jupyter:
%%time
r = out_worker(texts, nlp)
现在的输出是
CPU times: user 10.2 s, sys: 591 ms, total: 10.8 s
Wall time: 13.4 s
为什么会有这么大的差异?我的假设是,尽管我不知道为什么,在第二种情况下,nlp 对象的副本被发送到每个单一工作。
另外,我怎样才能从一个函数中正确调用这个多处理?
谢谢
编辑:
为了问题的重现性,这里有一个 Python 显示情况的脚本:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
print(f"Usual pool took {time.time()-st:.3f} seconds")
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
st = time.time()
r = out_worker(texts, nlp)
print(f"Pool within a function took {time.time()-st:.3f} seconds")
def out_worker2(texts, nlp, pool):
worker = lambda text: preworker(text, nlp)
return pool.map(worker, texts)
st = time.time()
pool = Pool(3)
r = out_worker2(texts, nlp, pool)
print(f"Pool passed to a function took {time.time()-st:.3f} seconds")
在我的例子中,输出是这样的:
Usual pool took 0.219 seconds
Pool within a function took 8.164 seconds
Pool passed to a function took 8.265 seconds
spacy nlp 对象相当重(几 MB)。我的 spacy 版本是 3.0.3
而不是
from pathos.multiprocessing import ProcessPool as Pool
, 我用过
from multiprocess import Pool
,本质上是一样的。然后我尝试了一些替代方法。
所以:
from multiprocess import Pool
“通常”情况的结果为 0.1s
,其他两种情况的结果为 12.5s
。
但是:
from multiprocess import Pool
import dill
dill.settings['recurse'] = True
所有三种情况的产量 12.5s
。
最后:
from multiprocess.dummy import Pool
所有三种情况的产量 0.1s
。
这告诉我,这肯定是一个序列化问题,globals
的序列化才是速度的关键。
在第一种情况下,默认的 dill
行为是尽可能避免通过 globals
进行递归。它能够以“通常”的方式成功完成此操作,但不能用于函数内的其他两个调用。
当我第一次导入 dill
并将全局变量的行为切换为 recurse
时(这就是 cloudpickle
进行酸洗的方式),然后它在所有三次尝试中都很慢(“包括通常的”方式。
最后,如果我使用 multiprocess.dummy
,因此 ThreadPool
-- 它不需要序列化全局变量,您可以看到它在所有情况下都很快。
结论:如果可行,请使用 pathos.pools.ThreadPool
或 multiprocess.dummy.Pool
。否则,请确保您 运行 没有序列化全局变量。
dill
中有一个有用的工具,您可以使用它来查看正在序列化的内容。如果你包含 dill.detect.trace(True)
,那么 dill 会为它正在序列化的对象吐出一堆代码,因为它会递归地 pickle 对象及其依赖项。您必须查看 dill
源代码以匹配键(例如 F1
是一种特定类型的函数对象,而 D1
是一种特定类型的字典)。您可以看到不同的方法如何序列化不同的底层对象。不幸的是我没有分析器,所以你不能立即看到速度在哪里,但你可以看到它采取的不同策略。
我只是尽量避免序列化 nlp
对象,或任何导致速度变慢的对象(可能是 nlp
对象)。
例如,您可以这样代替在函数中传递 nlp 对象:
import spacy
from multiprocess import Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en espanol"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
pool.close(); pool.join()
print("Usual pool took {0:.3f} seconds".format(time.time()-st))
def out_worker(texts):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
res = pool.map(worker, texts)
pool.close(); pool.join()
return res
st = time.time()
r = out_worker(texts)
print("Pool within a function took {0:.3f} seconds".format(time.time()-st))
通过引用查找传递 nlp
而不是显式地通过函数参数,两种情况下的速度都是 0.1s
。