本地集群上的 Dask 数据加载:"Worker exceeded 95% memory budget"。重新启动然后 "KilledWorker"

Dask data loading on local cluster: "Worker exceeded 95% memory budget". Restarting and then "KilledWorker"

我知道以前有人问过类似的问题,但他们的解决方案不是很有帮助。我想最好的解决方案可能更具体地针对每个集群配置,所以我在此处提供有关我的集群和我的错误的更多详细信息。

import dask.dataframe as dd
import dask.bag as db
import json

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

这是我的集群设置

cluster.scheduler

#输出:

Scheduler: tcp://127.0.0.1:35367 workers: 8 cores: 48 tasks: 0



cluster.workers

#输出:

{0: <Nanny: tcp://127.0.0.1:43789, threads: 6>,
 1: <Nanny: tcp://127.0.0.1:41375, threads: 6>,
 2: <Nanny: tcp://127.0.0.1:42577, threads: 6>,
 3: <Nanny: tcp://127.0.0.1:40171, threads: 6>,
 4: <Nanny: tcp://127.0.0.1:32867, threads: 6>,
 5: <Nanny: tcp://127.0.0.1:46529, threads: 6>,
 6: <Nanny: tcp://127.0.0.1:41535, threads: 6>,
 7: <Nanny: tcp://127.0.0.1:39645, threads: 6>}


client

#output

Client
Scheduler: tcp://127.0.0.1:35367
Dashboard: http://127.0.0.1:8787/status
Cluster
Workers: 8
Cores: 48
Memory: 251.64 GiB

这是我的数据加载代码:

b = db.read_text('2019-12-16-latest-level.json').map(json.loads)

def flatten(record):
    return {
        'uuid': record['uuid'],
        'stored_at': record['stored_at'],
        'duration': record['duration']
}

运行 以上的所有代码都可以。这是遇到麻烦的那个:

df = b.map(flatten).to_dataframe()
df.head() 

代码 运行 大约 1 天,并发出以下警告:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

然后大约有一天,程序停止并给我以下错误:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-10-84f98622da69> in <module>
      1 df = b.map(flatten).to_dataframe()
----> 2 df.head()

这里是错误报告的最后几行:

KilledWorker: ("('bag-from-delayed-file_to_blocks-list-loads-flatten-0daa9cba16c635566df6215c209f653c', 0)", <WorkerState 'tcp://127.0.0.1:41535', name: 6, memory: 0, processing: 1>)

还附上完整错误报告的屏幕截图:

关于如何处理这个问题有什么建议吗?谢谢。

我已经使用 dask 大约一个月了,结果好坏参半。我个人认为,该软件在执行任务图时在其内存管理中有某种致命的拥抱。 dask 的一个典型动机是在短短几分钟内计算出 95% 的大型计算,然后在接下来的 8 小时里 c运行 慢慢地处理最后 5%,在它崩溃之前似乎什么都不做,或者我 运行 超出计算预算。这很令人沮丧。

也就是说,我使用较少的工作人员或将工作人员限制在进程而不是线程上取得了一些 有限的成功。所以,在 16 核机器上我可能会这样做:

client = Client(processes=True, threads_per_worker=1)

另一件重要的事情是明智地坚持。持久化会导致在给定时间图形中的任务(以及内存中的结果)减少。因此,如果我想从 json 文件中读取一个包,我会在转换为数据帧之前保留该包,否则读取和转换都发生在 compute() 步骤,我发现这是失败的秘诀.

然而,正如我所说,我发现 dask 非常令人失望,考虑到它表面上显示的所有功能。我改用 vaex。

抱歉没能提供更多帮助。