如何获取有关特定 Dask 任务的信息

How to get information about a particular Dask task

我 运行 遇到了一个问题,我的分布式集群似乎 "hang" - 例如任务停止处理,因此积压了未处理的任务,所以我正在寻找一些方法来帮助调试正在发生的事情。

Client 上有 processing 方法,它会告诉我每个工作人员当前 运行 有哪些任务,但 AFAICS 这是 [=] 上可用任务的唯一信息10=] 对象?

我想要的是不仅可以查询处理任务,还可以查询所有任务,包括已处理、正在处理和出错,并且每个任务都可以获得一些统计信息,例如 submitted_timecompletion_time 这将使我能够找出哪些任务正在阻止集群。

这类似于 ipyparallel.AsyncResult

上的扩展元数据

如果能够获得任何给定任务的 args/kwargs,那真是太好了。这对调试失败的任务特别有帮助。

目前是否有任何此功能可用,或者是否有任何方法可以获取我想要的信息?

非常欢迎任何其他关于如何调试问题的建议。

截至 2017 年 5 月,不存在明确的 "give me all of the information about a task" 操作。但是,您可以使用客户端直接调查任务状态。这将需要您深入了解调度程序和工作程序跟踪的信息。请参阅以下文档页面:

要查询此状态,我将使用 Client.run_on_scheduler and Client.run 方法。这些分别在调度程序或工作程序上采用 运行 的功能。如果此函数包含 dask_schedulerdask_worker 参数,则该函数将被赋予调度程序或工作对象本身。

def f(dask_scheduler):
    return dask_scheduler.task_state

client.run_on_scheduler(f)

您现在可以检查调度程序或工作人员知道的任何状态,以及 运行 任何内部诊断检查。不过,您选择调查的内容完全取决于您的用例。

def f(keys, dask_scheduler=None):
    return dask_scheduler.transition_story(*keys)

client.run_on_scheduler(f, [key1, key2, key3])