如何确保 concurrent.futures 的迭代器中每个 Future 的超时?
How to ensure a timeout per each Future in an iterator of concurrent.futures?
documentation 围绕 concurrent.futures
的超时非常难以理解。在一个简单的例子中,我想通过在扫描作业函数列表的循环中调用 .submit
来使用 ProcessPoolExecutor
。我希望这些 Future
对象中的每一个都有 10 分钟的关联超时,否则它们将异步完成。
我的第一个方法是尝试使用 as_completed
函数,该函数生成 Future 对象的迭代器,并且仅在完成时生成下一个对象。 as_completed
接受一个 timeout
参数,但文档说这个超时是相对于 as_completed
被调用的第一时刻,而不一定是任何 Future
对象本身的生命周期。
例如假设 ProcessPoolExecutor
只有 3 个工作进程,但 Future
对象列表包含 10 个项目。在处理前 3 个项目时,其中 7 个项目可能处于未处理状态长达 10 分钟。此后不久,as_completed
的超时将被触发导致失败,即使每个人 Future
可能已经达到了 10 分钟的限制。
请注意,适用于 as_completed
的相同限制也适用于 wait
,并且 wait
更难用于此用例,因为 return它支持的选项。
我的下一个想法是使用 timeout
parameter that future.result
allows 并为我的期货列表中的每个 f
(未来)调用 f.result(timeout=600)
。但是,没有真正以阻塞方式要求结果的方式来设置此超时。如果您迭代期货列表并调用 f.result(...)
,此调用会在指定的超时时间内阻塞。
另一方面,您也不能将 f.result
与 as_completed
组合在一起,以一种天真但看似正确的方式,如
[f.result(timeout=600) for f in as_completed(futures_list)]
... 因为 as_completed
的迭代在 futures 完成时欺骗性地异步等待,并且只有 return 使它们 .result
在 [= 之后被调用 59=] 他们已经完成了。
鉴于此,生成 Future
列表的正确模式是什么,其中每个列表都有自己的超时时间,然后异步等待它们完成?
似乎无法在这种异步上下文中提供每个 Future 超时。可用的 API 函数 wait
和 as_completed
通过支持 Future
对象的可迭代中所有任务的全局超时来走更容易的路,并且不尝试测量时间当Future
第一次开始主动处于正在处理的状态时
我选择了一种解决方法,将我的任务列表分成一组块,并对每个块使用 as_completed
。块大小设置为与我的 ProcessPoolExecutor
配置使用的工作人员数量相同,因此我可以确定 as_completed
的 "global" 超时正在秘密运行每个 Future 超时,因为所有任务都会立即主动处理。缺点是利用率较低,因为当任务提早完成时,进程池不能空闲以获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这没问题,但这是我必须选择的 concurrent.futures
的重大可用性缺陷。
这是一些示例代码。假设 my_task_list
已经包含通过 functools.partial
或其他方式绑定的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独迭代中提供参数,并根据需要传递到 submit
。
my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
all_results = []
for chunk_start in range(0, len(my_task_list), num_workers):
chunk = my_task_list[chunk_start:chunk_start + num_workers]
# could extract parameters to pass for this task chunk here.
futures = [pool.submit(task) for task in chunk]
all_results += [
f.result() for f in as_completed(futures, timeout=my_timeout)
]
return all_results
请注意,如果您选择的 num_workers
多于 ProcessPoolExecutor
可用的处理器数量,您最终会得到比给定块中的处理器更多的任务,并且 return as_completed
的超时无法正确应用于每个任务的运行时的情况,可能会导致与仅在 as_completed
或 wait
没有分块的整体任务列表。
documentation 围绕 concurrent.futures
的超时非常难以理解。在一个简单的例子中,我想通过在扫描作业函数列表的循环中调用 .submit
来使用 ProcessPoolExecutor
。我希望这些 Future
对象中的每一个都有 10 分钟的关联超时,否则它们将异步完成。
我的第一个方法是尝试使用 as_completed
函数,该函数生成 Future 对象的迭代器,并且仅在完成时生成下一个对象。 as_completed
接受一个 timeout
参数,但文档说这个超时是相对于 as_completed
被调用的第一时刻,而不一定是任何 Future
对象本身的生命周期。
例如假设 ProcessPoolExecutor
只有 3 个工作进程,但 Future
对象列表包含 10 个项目。在处理前 3 个项目时,其中 7 个项目可能处于未处理状态长达 10 分钟。此后不久,as_completed
的超时将被触发导致失败,即使每个人 Future
可能已经达到了 10 分钟的限制。
请注意,适用于 as_completed
的相同限制也适用于 wait
,并且 wait
更难用于此用例,因为 return它支持的选项。
我的下一个想法是使用 timeout
parameter that future.result
allows 并为我的期货列表中的每个 f
(未来)调用 f.result(timeout=600)
。但是,没有真正以阻塞方式要求结果的方式来设置此超时。如果您迭代期货列表并调用 f.result(...)
,此调用会在指定的超时时间内阻塞。
另一方面,您也不能将 f.result
与 as_completed
组合在一起,以一种天真但看似正确的方式,如
[f.result(timeout=600) for f in as_completed(futures_list)]
... 因为 as_completed
的迭代在 futures 完成时欺骗性地异步等待,并且只有 return 使它们 .result
在 [= 之后被调用 59=] 他们已经完成了。
鉴于此,生成 Future
列表的正确模式是什么,其中每个列表都有自己的超时时间,然后异步等待它们完成?
似乎无法在这种异步上下文中提供每个 Future 超时。可用的 API 函数 wait
和 as_completed
通过支持 Future
对象的可迭代中所有任务的全局超时来走更容易的路,并且不尝试测量时间当Future
第一次开始主动处于正在处理的状态时
我选择了一种解决方法,将我的任务列表分成一组块,并对每个块使用 as_completed
。块大小设置为与我的 ProcessPoolExecutor
配置使用的工作人员数量相同,因此我可以确定 as_completed
的 "global" 超时正在秘密运行每个 Future 超时,因为所有任务都会立即主动处理。缺点是利用率较低,因为当任务提早完成时,进程池不能空闲以获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这没问题,但这是我必须选择的 concurrent.futures
的重大可用性缺陷。
这是一些示例代码。假设 my_task_list
已经包含通过 functools.partial
或其他方式绑定的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独迭代中提供参数,并根据需要传递到 submit
。
my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
all_results = []
for chunk_start in range(0, len(my_task_list), num_workers):
chunk = my_task_list[chunk_start:chunk_start + num_workers]
# could extract parameters to pass for this task chunk here.
futures = [pool.submit(task) for task in chunk]
all_results += [
f.result() for f in as_completed(futures, timeout=my_timeout)
]
return all_results
请注意,如果您选择的 num_workers
多于 ProcessPoolExecutor
可用的处理器数量,您最终会得到比给定块中的处理器更多的任务,并且 return as_completed
的超时无法正确应用于每个任务的运行时的情况,可能会导致与仅在 as_completed
或 wait
没有分块的整体任务列表。