Python:让 concurrent.futures 执行器等待 done_callbacks 完成

Python: Getting a concurrent.futures Executor to wait for done_callbacks to complete

是否可以让 ThreadPoolExecutor 等待其所有 future 及其 add_done_callback() 函数完成而无需调用 .shutdown(wait=True)?下面的代码片段说明了我试图完成的本质,即在外循环迭代之间重用线程池。

from concurrent.futures import ThreadPoolExecutor, wait
import time

def proc_func(n):
    return n + 1


def create_callback_func(fid, sleep_time):
    def callback(future):
        time.sleep(sleep_time)
        fid.write(str(future.result()))
        return

    return callback


num_workers = 4
num_files_write = 3
num_tasks = 8
sleep_time = 1

pool = ThreadPoolExecutor(max_workers=num_workers)

for n in range(num_files_write):
    fid = open(f'test{n}.txt', 'w')
    futs = []

    callback_func = create_callback_func(fid, sleep_time)

    for t in range(num_tasks):
        fut = pool.submit(proc_func, n)
        fut.add_done_callback(callback_func)
        futs.append(fut)

    wait(futs)
    fid.close()

pool.shutdown(wait=True)

运行 此代码抛出一堆 ValueError: I/O operation on closed file. 并且写入的三个文件具有以下内容:
test0.txt -> 1111
test1.txt -> 2222
test3.txt -> 3333

显然这是错误的,每个数字应该有八个。如果我为每个文件创建并关闭一个单独的 ThreadPoolExecutor,则会获得正确的结果。所以我知道 Executor 能够正确等待所有回调完成,但我可以告诉它这样做而不关闭它吗?

恐怕无法完成,您正在“滥用”回调。

回调的主要目的是通知预定的工作已经完成。

内部未来状态为 PENDING -> 运行 -> FINISHED(为简洁起见忽略取消)。当达到 FINISHED 状态时,将调用回调,但当它们完成时没有下一个状态。这就是为什么无法与该事件同步的原因。

在可用线程之一中执行提交函数的核心是(简化):

try:
    result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
     self.future.set_exception(exc)
else:
    self.future.set_result(result)

set_exceptionset_result 看起来像这样(非常简单):

... save the result/exception
self._state = FINISHED
... wakeup all waiters
self._invoke_callbacks() # this is the last statement

未来处于完成状态,即调用“完成”回调时处于“完成”状态。在将工作标记为完成之前通知工作已完成是没有意义的。

正如您已经注意到的,在您的代码中:

wait(futs)
fid.close()

waitreturns,文件关闭,但回调尚未完成,无法尝试写入已关闭的文件。


第二个问题是为什么shutdown(wait=True)有效?仅仅是因为它等待所有线程:

if wait:
    for t in self._threads:
        t.join()

那些线程也执行回调(见上面的代码片段)。这就是为什么回调执行必须在线程结束时结束。