Dask:as_completed 的 asyncio 等价物是什么?

Dask : what is the asyncio equivalent of as_completed?

我有一个像这样的 Dask 客户端代码:

client = Client(address=self.cluster)

futures = []

for job in jobs:

   future = client.submit(...)
   futures.append(future)

for future, result in as_completed(futures, with_results=True, raise_errors=True):

   key = future.key
   state = (State.FINISHED if result is True else State.FAILED)
   
   ...

Dask as_completed 函数是相关的,因为它迭代已完成良好订单的作业。

该代码的问题是它可能会无限期地阻止 as_completed 调用,例如,如果工作人员不可用。

有没有办法用 asyncio 重写它?事实上,对于 asyncio,我可以使用带有超时的 wait 函数,以便在出现错误时解除阻塞调用。

谢谢

您可以使用 asyncio.as_completed https://docs.python.org/3/library/asyncio-task.html