如何确保 concurrent.futures 的迭代器中每个 Future 的超时?

How to ensure a timeout per each Future in an iterator of concurrent.futures?

documentation 围绕 concurrent.futures 的超时非常难以理解。在一个简单的例子中,我想通过在扫描作业函数列表的循环中调用 .submit 来使用 ProcessPoolExecutor。我希望这些 Future 对象中的每一个都有 10 分钟的关联超时,否则它们将异步完成。

我的第一个方法是尝试使用 as_completed 函数,该函数生成 Future 对象的迭代器,并且仅在完成时生成下一个对象。 as_completed 接受一个 timeout 参数,但文档说这个超时是相对于 as_completed 被调用的第一时刻,而不一定是任何 Future 对象本身的生命周期。

例如假设 ProcessPoolExecutor 只有 3 个工作进程,但 Future 对象列表包含 10 个项目。在处理前 3 个项目时,其中 7 个项目可能处于未处理状态长达 10 分钟。此后不久,as_completed 的超时将被触发导致失败,即使每个人 Future 可能已经达到了 10 分钟的限制。

请注意,适用于 as_completed 的相同限制也适用于 wait,并且 wait 更难用于此用例,因为 return它支持的选项。

我的下一个想法是使用 timeout parameter that future.result allows 并为我的期货列表中的每个 f(未来)调用 f.result(timeout=600)。但是,没有真正以阻塞方式要求结果的方式来设置此超时。如果您迭代期货列表并调用 f.result(...),此调用会在指定的超时时间内阻塞。

另一方面,您也不能将 f.resultas_completed 组合在一起,以一种天真但看似正确的方式,如

[f.result(timeout=600) for f in as_completed(futures_list)]

... 因为 as_completed 的迭代在 futures 完成时欺骗性地异步等待,并且只有 return 使它们 .result 在 [= 之后被调用 59=] 他们已经完成了。

鉴于此,生成 Future 列表的正确模式是什么,其中每个列表都有自己的超时时间,然后异步等待它们完成?

似乎无法在这种异步上下文中提供每个 Future 超时。可用的 API 函数 waitas_completed 通过支持 Future 对象的可迭代中所有任务的全局超时来走更容易的路,并且不尝试测量时间当Future第一次开始主动处于正在处理的状态时

我选择了一种解决方法,将我的任务列表分成一组块,并对每个块使用 as_completed。块大小设置为与我的 ProcessPoolExecutor 配置使用的工作人员数量相同,因此我可以确定 as_completed 的 "global" 超时正在秘密运行每个 Future 超时,因为所有任务都会立即主动处理。缺点是利用率较低,因为当任务提早完成时,进程池不能空闲以获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这没问题,但这是我必须选择的 concurrent.futures 的重大可用性缺陷。

这是一些示例代码。假设 my_task_list 已经包含通过 functools.partial 或其他方式绑定的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独迭代中提供参数,并根据需要传递到 submit

my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
    all_results = []
    for chunk_start in range(0, len(my_task_list), num_workers):
        chunk = my_task_list[chunk_start:chunk_start + num_workers]
        # could extract parameters to pass for this task chunk here.
        futures = [pool.submit(task) for task in chunk]
        all_results += [
            f.result() for f in as_completed(futures, timeout=my_timeout)
        ]
    return all_results

请注意,如果您选择的 num_workers 多于 ProcessPoolExecutor 可用的处理器数量,您最终会得到比给定块中的处理器更多的任务,并且 return as_completed 的超时无法正确应用于每个任务的运行时的情况,可能会导致与仅在 as_completedwait没有分块的整体任务列表。