多处理池 'apply_async' 似乎只调用一次函数
Multiprocessing pool 'apply_async' only seems to call function once
我一直在关注文档以尝试了解多处理池。我想到了这个:
import time
from multiprocessing import Pool
def f(a):
print 'f(' + str(a) + ')'
return True
t = time.time()
pool = Pool(processes=10)
result = pool.apply_async(f, (1,))
print result.get()
pool.close()
print ' [i] Time elapsed ' + str(time.time() - t)
我正在尝试使用 10 个进程来评估函数 f(a)
。我在 f
.
中放置了打印语句
这是我得到的输出:
$ python pooltest.py
f(1)
True
[i] Time elapsed 0.0270888805389
在我看来,函数 f
只被评估了一次。
我可能没有使用正确的方法,但我正在寻找的最终结果是 运行 f
同时处理 10 个进程,并获得每个进程返回的结果过程。所以我会以 10 个结果的列表结尾(可能相同也可能不同)。
关于多处理的文档非常混乱,弄清楚我应该采用哪种方法并非易事,在我看来 f
应该是 运行 我提供的示例中的 10 次以上。
每次您编写 pool.apply_async(...)
时,它都会将该函数调用委托给在池中启动的进程之一。如果要在多个进程中调用该函数,则需要发出多次 pool.apply_async
调用。
注意,还有一个 pool.map
(and pool.map_async
) 函数,它将接受一个函数和一个可迭代的输入:
inputs = range(30)
results = pool.map(f, inputs)
这些函数会将函数应用于 inputs
可迭代对象中的每个输入。它尝试将 "batches" 放入池中,以便负载在池中的所有进程之间得到相当均匀的平衡。
apply_async 并不意味着启动 多个 进程;它只是为了在池的一个进程中使用参数调用函数。如果您希望该函数被调用 10 次,则需要进行 10 次调用。
首先,请注意 apply()
上的文档(强调已添加):
apply(func[, args[, kwds]])
Call func with arguments args and keyword arguments kwds. It blocks
until the result is ready. Given this blocks, apply_async() is better
suited for performing work in parallel. Additionally, func is only
executed in one of the workers of the pool.
现在,在 apply_async()
的文档中:
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
A variant of the apply() method which returns a result object.
两者的区别就在于apply_asyncreturns马上。您可以使用 map()
多次调用函数,但如果您使用相同的输入进行调用,那么创建 相同 参数的列表有点多余有一个正确长度的序列。
但是,如果您使用 相同的 输入调用不同的函数,那么您实际上只是在调用高阶函数,您可以使用 map
或 map_async()
像这样:
multiprocessing.map(lambda f: f(1), functions)
除了 lambda 函数不可 pickleable,因此您需要使用定义的函数(参见 How to let Pool.map take a lambda function)。您实际上可以使用内置的 apply()
(不是多处理的)(尽管它已被弃用):
multiprocessing.map(apply,[(f,1) for f in functions])
自己编写也很容易:
def apply_(f,*args,**kwargs):
return f(*args,**kwargs)
multiprocessing.map(apply_,[(f,1) for f in functions])
如果您出于任何特定原因不致力于 Pool,我已经围绕 multiprocessing.Process 编写了一个函数,它可能会为您解决问题。它已发布 here,但如果您需要,我很乐意将最新版本上传到 github。
如果您想 运行 十个进程中的一段代码,然后每个进程退出,那么 Pool
十个进程可能不适合使用。
相反,创建十个 Process
es 到 运行 代码:
processes = []
for _ in range(10):
p = multiprocessing.Process(target=f, args=(1,))
p.start()
processes.append(p)
for p in processes:
p.join()
multiprocessing.Pool
class 旨在处理进程数和作业数不相关的情况。通常选择的进程数是您拥有的 CPU 核心数,而作业数要大得多。谢谢!
我一直在关注文档以尝试了解多处理池。我想到了这个:
import time
from multiprocessing import Pool
def f(a):
print 'f(' + str(a) + ')'
return True
t = time.time()
pool = Pool(processes=10)
result = pool.apply_async(f, (1,))
print result.get()
pool.close()
print ' [i] Time elapsed ' + str(time.time() - t)
我正在尝试使用 10 个进程来评估函数 f(a)
。我在 f
.
这是我得到的输出:
$ python pooltest.py
f(1)
True
[i] Time elapsed 0.0270888805389
在我看来,函数 f
只被评估了一次。
我可能没有使用正确的方法,但我正在寻找的最终结果是 运行 f
同时处理 10 个进程,并获得每个进程返回的结果过程。所以我会以 10 个结果的列表结尾(可能相同也可能不同)。
关于多处理的文档非常混乱,弄清楚我应该采用哪种方法并非易事,在我看来 f
应该是 运行 我提供的示例中的 10 次以上。
每次您编写 pool.apply_async(...)
时,它都会将该函数调用委托给在池中启动的进程之一。如果要在多个进程中调用该函数,则需要发出多次 pool.apply_async
调用。
注意,还有一个 pool.map
(and pool.map_async
) 函数,它将接受一个函数和一个可迭代的输入:
inputs = range(30)
results = pool.map(f, inputs)
这些函数会将函数应用于 inputs
可迭代对象中的每个输入。它尝试将 "batches" 放入池中,以便负载在池中的所有进程之间得到相当均匀的平衡。
apply_async 并不意味着启动 多个 进程;它只是为了在池的一个进程中使用参数调用函数。如果您希望该函数被调用 10 次,则需要进行 10 次调用。
首先,请注意 apply()
上的文档(强调已添加):
apply(func[, args[, kwds]])
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.
现在,在 apply_async()
的文档中:
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
A variant of the apply() method which returns a result object.
两者的区别就在于apply_asyncreturns马上。您可以使用 map()
多次调用函数,但如果您使用相同的输入进行调用,那么创建 相同 参数的列表有点多余有一个正确长度的序列。
但是,如果您使用 相同的 输入调用不同的函数,那么您实际上只是在调用高阶函数,您可以使用 map
或 map_async()
像这样:
multiprocessing.map(lambda f: f(1), functions)
除了 lambda 函数不可 pickleable,因此您需要使用定义的函数(参见 How to let Pool.map take a lambda function)。您实际上可以使用内置的 apply()
(不是多处理的)(尽管它已被弃用):
multiprocessing.map(apply,[(f,1) for f in functions])
自己编写也很容易:
def apply_(f,*args,**kwargs):
return f(*args,**kwargs)
multiprocessing.map(apply_,[(f,1) for f in functions])
如果您出于任何特定原因不致力于 Pool,我已经围绕 multiprocessing.Process 编写了一个函数,它可能会为您解决问题。它已发布 here,但如果您需要,我很乐意将最新版本上传到 github。
如果您想 运行 十个进程中的一段代码,然后每个进程退出,那么 Pool
十个进程可能不适合使用。
相反,创建十个 Process
es 到 运行 代码:
processes = []
for _ in range(10):
p = multiprocessing.Process(target=f, args=(1,))
p.start()
processes.append(p)
for p in processes:
p.join()
multiprocessing.Pool
class 旨在处理进程数和作业数不相关的情况。通常选择的进程数是您拥有的 CPU 核心数,而作业数要大得多。谢谢!