如何在 Dask 中停止 运行 任务?

How do I stop a running task in Dask?

当使用 Dask 的分布式调度程序时,我有一个远程工作人员 运行 的任务要停止。

我该如何阻止它?我知道取消方法,但如果任务已经开始执行,这似乎不起作用。

如果还没有运行

如果任务还没有开始运行你可以通过取消关联的未来来取消它

future = client.submit(func, *args)  # start task
future.cancel()                      # cancel task

如果您正在使用 dask 集合,那么您可以使用 client.cancel 方法

x = x.persist()   # start many tasks 
client.cancel(x)  # cancel all tasks

如果是运行

但是,如果您的任务已经在工作线程中的某个线程上启动 运行,那么您无法中断该线程。不幸的是,这是 Python 的限制。

建立明确的停止条件

您最好的办法是使用您自己的自定义逻辑在您的函数中构建某种停止条件。您可能会考虑在循环中检查共享变量。在这些文档中查找 "Variable":http://dask.pydata.org/en/latest/futures.html

from dask.distributed import Client, Variable

client = Client()
stop = Varible()
stop.put(False)

def long_running_task():
    while not stop.get():
        ... do stuff

future = client.submit(long_running_task)

... wait a while

stop.put(True)