带有上下文管理器的 dask 集群

dask clusters with context manager

考虑这样一个简单的工作流程:

from dask.distributed import Client
import time

with Client() as client:
    futs = client.map(time.sleep, list(range(10)))

上面的代码将提交并几乎立即取消期货,因为上下文管理器将关闭。可以让上下文管理器保持打开状态,直到使用 client.gather 完成任务,但是这将阻止在当前进程中进一步执行。

我有兴趣在同一进程中将任务提交到多个集群(例如本地和分布式),理想情况下不阻塞当前进程。明确定义不同的客户端和集群很简单,但是否也可以使用上下文管理器(每个唯一的 client/cluster)?

这听起来有点反模式,但也许只有在计算完所有 futures 之后才能关闭集群 运行。我试过 fire_and_forget 也试过 shutdown_on_close=False,但似乎没有实现。

对于某些 Dask cluster/scheduler 类型,例如 dask-cloudprovider ECSCluster,上述使用 with 块和 shutdown_on_close=False 的方法可以正常工作.

ECSClusterSLURMCluster 都来自 SpecCluster。但是,ECSCluster 通过以下调用将其 **kwargs(包括 shutdown_on_close)传递给 SpecCluster 构造函数:

super().__init__(**kwargs)

(参见 ECSCluster 代码 here

SLURMCluster 不会:它调用 JobQueueCluster 构造函数,后者又仅使用其参数的一个子集实例化 SpecCluster

super().__init__(
    scheduler=scheduler,
    worker=worker,
    loop=loop,
    security=security,
    silence_logs=silence_logs,
    asynchronous=asynchronous,
    name=name,
)

参见JobQueueCluster代码here

因此 SLURMCluster/JobQueueCluster 忽略 shutdown_on_close(和其他可选参数)。看起来您的用例需要更新 JobQueueCluster