多处理池 'apply_async' 似乎只调用一次函数

Multiprocessing pool 'apply_async' only seems to call function once

我一直在关注文档以尝试了解多处理池。我想到了这个:

import time
from multiprocessing import Pool

def f(a):
    print 'f(' + str(a) + ')'
    return True

t = time.time()
pool = Pool(processes=10)
result = pool.apply_async(f, (1,))
print result.get()
pool.close()
print ' [i] Time elapsed ' + str(time.time() - t)

我正在尝试使用 10 个进程来评估函数 f(a)。我在 f.

中放置了打印语句

这是我得到的输出:

$ python pooltest.py 
f(1)
True
 [i] Time elapsed 0.0270888805389

在我看来,函数 f 只被评估了一次。

我可能没有使用正确的方法,但我正在寻找的最终结果是 运行 f 同时处理 10 个进程,并获得每个进程返回的结果过程。所以我会以 10 个结果的列表结尾(可能相同也可能不同)。

关于多处理的文档非常混乱,弄清楚我应该采用哪种方法并非易事,在我看来 f 应该是 运行 我提供的示例中的 10 次以上。

每次您编写 pool.apply_async(...) 时,它都会将该函数调用委托给在池中启动的进程之一。如果要在多个进程中调用该函数,则需要发出多次 pool.apply_async 调用。

注意,还有一个 pool.map (and pool.map_async) 函数,它将接受一个函数和一个可迭代的输入:

inputs = range(30)
results = pool.map(f, inputs)

这些函数会将函数应用于 inputs 可迭代对象中的每个输入。它尝试将 "batches" 放入池中,以便负载在池中的所有进程之间得到相当均匀的平衡。

apply_async 并不意味着启动 多个 进程;它只是为了在池的一个进程中使用参数调用函数。如果您希望该函数被调用 10 次,则需要进行 10 次调用。

首先,请注意 apply() 上的文档(强调已添加):

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

现在,在 apply_async() 的文档中:

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

两者的区别就在于apply_asyncreturns马上。您可以使用 map() 多次调用函数,但如果您使用相同的输入进行调用,那么创建 相同 参数的列表有点多余有一个正确长度的序列。

但是,如果您使用 相同的 输入调用不同的函数,那么您实际上只是在调用高阶函数,您可以使用 mapmap_async() 像这样:

multiprocessing.map(lambda f: f(1), functions)

除了 lambda 函数不可 pickleable,因此您需要使用定义的函数(参见 How to let Pool.map take a lambda function)。您实际上可以使用内置的 apply() (不是多处理的)(尽管它已被弃用):

multiprocessing.map(apply,[(f,1) for f in functions])

自己编写也很容易:

def apply_(f,*args,**kwargs):
  return f(*args,**kwargs)

multiprocessing.map(apply_,[(f,1) for f in functions])

如果您出于任何特定原因不致力于 Pool,我已经围绕 multiprocessing.Process 编写了一个函数,它可能会为您解决问题。它已发布 here,但如果您需要,我很乐意将最新版本上传到 github。

如果您想 运行 十个进程中的一段代码,然后每个进程退出,那么 Pool 十个进程可能不适合使用。

相反,创建十个 Processes 到 运行 代码:

processes = []

for _ in range(10):
    p = multiprocessing.Process(target=f, args=(1,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

multiprocessing.Pool class 旨在处理进程数和作业数不相关的情况。通常选择的进程数是您拥有的 CPU 核心数,而作业数要大得多。谢谢!