我如何等待 ThreadPoolExecutor.map 完成

How do I wait for ThreadPoolExecutor.map to finish

我有以下代码,已经过简化:

import concurrent.futures

pool = concurrent.futures.ThreadPoolExecutor(8)

def _exec(x):
    return x + x

myfuturelist = pool.map(_exec,[x for x in range(5)])

# How do I wait for my futures to finish?

for result in myfuturelist:
    # Is this how it's done?
    print(result)

#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous

关于 ThreadPoolExecutor.map 的文档薄弱。帮助会很棒。

谢谢!

Executor.map 将 运行 作业并行并等待 futures 完成,收集结果和 return 生成器。它已经完成了对你的等待。如果你设置超时,它会等到超时并在生成器中抛出异常。

map(func, *iterables, timeout=None, chunksize=1)

  • the iterables are collected immediately rather than lazily;
  • func is executed asynchronously and several calls to func may be made concurrently.

要获取期货列表并手动等待,您可以使用:

myfuturelist = [pool.submit(_exec, x) for x in range(5)]

Executor.submit 将 return 一个 future 对象,将来调用 result 将明确等待它完成:

myfutrelist[0].result() # wait the 1st future to finish and return the result

ThreadPoolExecutor.map 的调用 不会 阻塞,直到它的所有任务都完成。使用 wait 来执行此操作。

from concurrent.futures import wait, ALL_COMPLETED
...

futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED)  # ALL_COMPLETED is actually the default
do_other_stuff()

您还可以在 pool.map 返回的生成器上调用 list(results) 以强制执行评估(这就是您在原始示例中所做的)。但是,如果您实际上没有使用从任务返回的值,wait 是可行的方法。

确实 Executor.map() 不会等待所有期货完成。因为它 returns 像@MisterMiyagi 所说的惰性迭代器。

但我们可以使用 with:

import time

from concurrent.futures import ThreadPoolExecutor

def hello(i):
    time.sleep(i)
    print(i)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(hello, [1, 2, 3])
print("finish")

# output
# 1
# 2
# 3
# finish

如您所见,finish 打印在 1,2,3 之后。它之所以有效,是因为 Executor 有一个 __exit__() 方法,code

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False

ThreadPoolExecutorshutdown method

def shutdown(self, wait=True, *, cancel_futures=False):
    with self._shutdown_lock:
        self._shutdown = True
        if cancel_futures:
            # Drain all work items from the queue, and then cancel their
            # associated futures.
            while True:
                try:
                    work_item = self._work_queue.get_nowait()
                except queue.Empty:
                    break
                if work_item is not None:
                    work_item.future.cancel()

        # Send a wake-up to prevent threads calling
        # _work_queue.get(block=True) from permanently blocking.
        self._work_queue.put(None)
    if wait:
        for t in self._threads:
            t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__

因此,通过使用 with,我们可以获得等到所有期货完成的能力。