多处理库是否有 concurrent.futures.wait 等价物?

Is there a concurrent.futures.wait equivalent for the multiprocessing library?

我有 multiprocessing.Pool 个工人,我想阻塞主线程直到一个工人完成。如果我使用 concurrent.futures.ProcessPoolExecutor 我可以使用 concurrent.futures.wait 等待我提交给执行者的第一个任务完成以解除主线程的阻塞并可能对结果做一些事情。 multiprocessing.pool 是否有等价物?我可以使用队列并让 child 将结果插入队列,然后 parent 等待它,但我正在寻找更类似于等待函数的东西。

示例:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from random import random
from time import sleep

class Example:
    def start(self):
        with ProcessPoolExecutor(max_workers=10) as executor:
            futures = set()
            for i in range(10):
                futures.add(executor.submit(self.worker, i))
            while futures:
                done, futures = wait(futures, return_when=FIRST_COMPLETED)
                print(f"{len(done)} tasks completed")
                for task in done:
                    res = task.result()
                    print(f"Worker {res} finished")

    def worker(self, workerid):
        sleep(random())
        return workerid

ex = Example()
ex.start()

输出:

1 tasks completed
Worker 6 finished
1 tasks completed
Worker 4 finished
1 tasks completed
Worker 9 finished
1 tasks completed
Worker 8 finished
1 tasks completed
Worker 0 finished
1 tasks completed
Worker 3 finished
1 tasks completed
Worker 7 finished
1 tasks completed
Worker 2 finished
1 tasks completed
Worker 1 finished
1 tasks completed
Worker 5 finished

希望我已经理解你的问题了。看来您可以使用 pool.imap_unordered 函数来迭代完成的结果:

from time import sleep
from random import random
from multiprocessing import Pool


def worker_fn(workerid):
    sleep(random())
    return workerid


if __name__ == "__main__":
    with Pool() as pool:
        for result in pool.imap_unordered(worker_fn, range(10)):
            print(f"Worker {result} finished")

打印:

Worker 3 finished
Worker 6 finished
Worker 8 finished
Worker 9 finished
Worker 1 finished
Worker 2 finished
Worker 5 finished
Worker 4 finished
Worker 7 finished
Worker 0 finished

Andrej Kesely 给出的答案当然是最直接的,并且 more-or-less 相当于使用 concurrent.futures.as_completed 方法。但是根据定义,即使在迭代之间完成了两个或更多个任务,每次迭代总是只会给你一个完成的任务。

这将是最接近于您的代码正在执行的操作。但是很少有任务如此紧密地完成,以至于每次迭代都会产生多个已完成的任务,尽管这是可能的(参见下面的演示 运行,它确实发生了)。但我仍然希望每次打印出的已完成任务数都是 1:

from multiprocessing import Pool
from threading import Event
from random import random
from time import sleep

class Example:

    def start(self):

        def my_callback(result):
            nonlocal event
            # Show a task has completed:
            event.set()

        pool = Pool(10)
        event = Event()
        async_results = {pool.apply_async(self.worker, args=(i,), callback=my_callback) for i in range(10)}
        while async_results:
            # Wait for a task to complete:
            event.wait()
            event.clear()
            done = set()
            for async_result in async_results:
                if async_result.ready():
                    res = async_result.get()
                    print(f"Worker {res} finished")
                    done.add(async_result)

            tasks_completed = len(done)
            if tasks_completed:
                print(f"{tasks_completed} task(s) completed")
                async_results -= done
        pool.close()
        pool.join()

    def worker(self, workerid):
        sleep(random())
        return workerid

# In case we are running under Windows:
if __name__ == '__main__':
    ex = Example()
    ex.start()

打印:

Worker 7 finished
1 task(s) completed
Worker 4 finished
Worker 9 finished
2 task(s) completed
Worker 2 finished
1 task(s) completed
Worker 1 finished
1 task(s) completed
Worker 8 finished
1 task(s) completed
Worker 5 finished
1 task(s) completed
Worker 0 finished
1 task(s) completed
Worker 6 finished
1 task(s) completed
Worker 3 finished
1 task(s) completed

如果将worker修改为如下,那么多个任务同时完成的可能性会大很多more-or-less这样每次迭代都能很好的找到多个任务已经完成:

    def worker(self, workerid):
        #sleep(random())
        sleep(.1)
        return workerid

打印:

Worker 0 finished
1 task(s) completed
Worker 1 finished
Worker 2 finished
2 task(s) completed
Worker 5 finished
Worker 8 finished
Worker 7 finished
Worker 4 finished
Worker 9 finished
Worker 3 finished
6 task(s) completed
Worker 6 finished
1 task(s) completed

备注

回调函数my_callback将为每个完成的任务设置一次事件(即本例中的 10 次)。如果在块 while async_results: 的每次迭代中发现多个任务一起完成,则在下一次迭代期间将为在上一次迭代期间完成和处理的一个或多个任务设置事件,因此 tasks_completed 可能是 0。