我如何等待 ThreadPoolExecutor.map 完成
How do I wait for ThreadPoolExecutor.map to finish
我有以下代码,已经过简化:
import concurrent.futures
pool = concurrent.futures.ThreadPoolExecutor(8)
def _exec(x):
return x + x
myfuturelist = pool.map(_exec,[x for x in range(5)])
# How do I wait for my futures to finish?
for result in myfuturelist:
# Is this how it's done?
print(result)
#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous
关于 ThreadPoolExecutor.map 的文档薄弱。帮助会很棒。
谢谢!
Executor.map 将 运行 作业并行并等待 futures 完成,收集结果和 return 生成器。它已经完成了对你的等待。如果你设置超时,它会等到超时并在生成器中抛出异常。
map(func, *iterables, timeout=None, chunksize=1)
- the iterables are collected immediately rather than lazily;
- func is executed asynchronously and several calls to func may be made concurrently.
要获取期货列表并手动等待,您可以使用:
myfuturelist = [pool.submit(_exec, x) for x in range(5)]
Executor.submit 将 return 一个 future 对象,将来调用 result
将明确等待它完成:
myfutrelist[0].result() # wait the 1st future to finish and return the result
对 ThreadPoolExecutor.map
的调用 不会 阻塞,直到它的所有任务都完成。使用 wait 来执行此操作。
from concurrent.futures import wait, ALL_COMPLETED
...
futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED) # ALL_COMPLETED is actually the default
do_other_stuff()
您还可以在 pool.map
返回的生成器上调用 list(results)
以强制执行评估(这就是您在原始示例中所做的)。但是,如果您实际上没有使用从任务返回的值,wait
是可行的方法。
确实 Executor.map()
不会等待所有期货完成。因为它 returns 像@MisterMiyagi 所说的惰性迭代器。
但我们可以使用 with
:
import time
from concurrent.futures import ThreadPoolExecutor
def hello(i):
time.sleep(i)
print(i)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(hello, [1, 2, 3])
print("finish")
# output
# 1
# 2
# 3
# finish
如您所见,finish
打印在 1,2,3
之后。它之所以有效,是因为 Executor
有一个 __exit__()
方法,code 是
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
ThreadPoolExecutor
的 shutdown
method 是
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
因此,通过使用 with
,我们可以获得等到所有期货完成的能力。
我有以下代码,已经过简化:
import concurrent.futures
pool = concurrent.futures.ThreadPoolExecutor(8)
def _exec(x):
return x + x
myfuturelist = pool.map(_exec,[x for x in range(5)])
# How do I wait for my futures to finish?
for result in myfuturelist:
# Is this how it's done?
print(result)
#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous
关于 ThreadPoolExecutor.map 的文档薄弱。帮助会很棒。
谢谢!
Executor.map 将 运行 作业并行并等待 futures 完成,收集结果和 return 生成器。它已经完成了对你的等待。如果你设置超时,它会等到超时并在生成器中抛出异常。
map(func, *iterables, timeout=None, chunksize=1)
- the iterables are collected immediately rather than lazily;
- func is executed asynchronously and several calls to func may be made concurrently.
要获取期货列表并手动等待,您可以使用:
myfuturelist = [pool.submit(_exec, x) for x in range(5)]
Executor.submit 将 return 一个 future 对象,将来调用 result
将明确等待它完成:
myfutrelist[0].result() # wait the 1st future to finish and return the result
对 ThreadPoolExecutor.map
的调用 不会 阻塞,直到它的所有任务都完成。使用 wait 来执行此操作。
from concurrent.futures import wait, ALL_COMPLETED
...
futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED) # ALL_COMPLETED is actually the default
do_other_stuff()
您还可以在 pool.map
返回的生成器上调用 list(results)
以强制执行评估(这就是您在原始示例中所做的)。但是,如果您实际上没有使用从任务返回的值,wait
是可行的方法。
确实 Executor.map()
不会等待所有期货完成。因为它 returns 像@MisterMiyagi 所说的惰性迭代器。
但我们可以使用 with
:
import time
from concurrent.futures import ThreadPoolExecutor
def hello(i):
time.sleep(i)
print(i)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(hello, [1, 2, 3])
print("finish")
# output
# 1
# 2
# 3
# finish
如您所见,finish
打印在 1,2,3
之后。它之所以有效,是因为 Executor
有一个 __exit__()
方法,code 是
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
ThreadPoolExecutor
的 shutdown
method 是
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
因此,通过使用 with
,我们可以获得等到所有期货完成的能力。