Dask - 如何取消和重新提交停滞的任务?

Dask - How to cancel and resubmit stalled tasks?

我经常遇到 Dask 在几个任务上随机停止的问题,通常与从我网络上的不同节点读取数据有关(下面有更多详细信息)。这可能会在 运行 正常运行脚本几个小时后发生。它将以如下所示的形式无限期挂起(否则此循环需要几秒钟才能完成):

在这种情况下,我看到只有少数停滞的进程,并且都在一个特定节点 (192.168.0.228) 上:

此节点上的每个工作人员都在几个 read_parquet 任务上停滞不前:

这是使用以下代码调用的,并且使用的是 fastparquet:

ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)

我的集群是 运行ning Ubuntu 19.04 和所有最新版本(截至 11/12)的 Dask 和 Distributed 以及所需的包(例如,tornado、fsspec、fastparquet 等) .)

.228 节点试图访问的数据位于我集群中的另一个节点上。 .228 节点通过 CIFS 文件共享访问数据。我 运行 Dask 调度程序在我 运行 运行脚本的同一节点上(不同于 .228 节点和数据存储节点)。该脚本使用 Paramiko 通过 SSH 将工作人员连接到调度程序:

ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
                                                            ' --name ' + comp_name_decode +
                                                            ' --nprocs ' + str(nproc_int) +
                                                            ' --nthreads 10 ' +
                                                            self.dask_scheduler_ip, get_pty=True)  

.228 节点与调度程序和数据存储节点的连接看起来都很健康。 .228 节点可能在尝试处理 read_parquet 任务时遇到某种短暂的连接问题,但如果发生这种情况,.228 节点与调度程序和 CIFS 共享的连接不会受到超出此范围的影响短暂的一刻。在任何情况下,日志都不会显示任何问题。这是 .228 节点的完整日志:

distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445

distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445

distributed.worker - INFO - dashboard at: 192.168.0.228:37751

distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 2

distributed.worker - INFO - Memory: 14.53 GB

distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

撇开这是 Dask 中的错误还是我的 code/network 中的错误,是否可以为调度程序处理的所有任务设置一般超时?或者,是否可以:

  1. 识别停滞的任务,
  2. 复制停滞的任务并将其移至另一个工作人员,并且
  3. 取消暂停的任务?

is it possible to set a general timeout for all tasks handled by the scheduler?

截至 2019-11-13 不幸的是,答案是否定的。

如果任务正常失败,那么您可以使用 client.retry(...) 重试该任务,但没有自动方法让任务在特定时间后自行失败。这是您必须自己写入 Python 函数的内容。不幸的是,很难在另一个线程中中断 Python 函数,这也是未实现此功能的部分原因。

如果 worker 宕机,那么事情会在别处尝试。然而,从你所说的听起来一切都很健康,只是任务本身可能会永远持续下去。不幸的是,很难将其确定为失败案例。