Python multiprocessing - 如何高效地从地图中分离出来?

Python multiprocessing - How to break from a map efficiently?

map 中断按预期工作:

def worker(x):
    print("worker call x=%s" % x)
    return x

for x in map(worker, range(5)):
    print(x)
    if x == 2:
        break

worker call x=0
0
worker call x=1
1
worker call x=2
2

但是如果我对 multiprocessing 做同样的事情,我会得到这个:

from multiprocessing import Pool

pool = Pool(2)
for x in pool.map(worker, range(5)):
    print(x)
    if x == 2:
        break
pool.close()
pool.join()

0
1
2
worker call x=0
worker call x=1
worker call x=2
worker call x=3
worker call x=4

为什么 multiprocessing 的映射表现不同?如何避免不必要的函数调用?

multiprocessing.Pools 的基本性质是,只要您说 pool.map(...),它就会将传递的可迭代对象中的所有任务提交到队列中,供工作进程执行。一旦这样的任务被放入池中,它最终会被一个工作进程消费并处理。你对结果做的任何事情都不能改变它。

multiprocessing 的 map 行为不同,因为它不像内置 map 那样同步读取 map 可迭代对象,它立即将每个迭代拆分为一个单独的进程并连接结果。

如果您不熟悉并发原则,我将尝试简要解释一下。

在您使用内置映射的第一个示例中,代码将创建一个可迭代对象,允许您按顺序一次执行一个 worker。它一次执行一个并按顺序执行的事实意味着打印 worker call x= 的函数将始终先打印,然后继续执行到循环内部,该循环将仅打印 x 的值。这也意味着当您的循环达到 2 时,您可以退出循环而无需额外调用 map 或循环主体本身。此为同步操作,一切客套,轮到执行。

在您的第二个示例中,使用多处理映射代码仍会创建一个可处理 worker(x) 的可迭代对象。但是,这一次,您不是一次(同步)执行对 worker(x) 的每个调用。 multiprocessing map 调用将立即将所有 map 调用发送到单独的进程以首先执行,然后组合结果。然后,您的循环执行组合结果,并按照您的指示再次在 2 处停止。不幸的是,所有映射条目都已在单独的进程中执行,因此虽然循环体的执行次数最少,但映射却没有。

希望这能帮助您更好地理解原因。

应该注意的是,如果您尝试使用 Python2.x 的第一个版本(我试过),结果应该是:

worker call x=0
worker call x=1
worker call x=2
worker call x=3
worker call x=4
0
1
2

不涉及任何多处理。

不同的是在Python2中,doc表示:

Apply function to every item of iterable and return a list of the results...

当 Python 3 doc 状态:

Return an iterator that applies function to every item of iterable, yielding the results...

这意味着 map 在 Python 3 中被更改为 return 可迭代对象而不是列表。

甚至在 Python 3 中,multiprocessing.pool.Pool.map 医生说:

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

(强调我的)

这意味着该方法首先通过产生多个进程来计算结果列表,然后才 return 一个完整的结果对象,而不是每次子进程结束时都产生一个值。这样,它更接近 Python2 map 内置而不是 Python3 一个。