如何查明 Dask worker 到 worker 连接超时问题的原因?

How to pinpoint cause of Dask worker to worker connection timeout issues?

随着我的数据集变大(因此分区的数量和大小也变大),我的分布式 Dask 集群中的工作人员最终因彼此之间的连接超时而失败。

例如,我反复看到错误日志,例如(混淆了路径和 IP):

distributed.worker - ERROR - Worker stream died during communication: tcp://123.123.123.123:41076
Traceback (most recent call last):
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 321, in connect
    await asyncio.wait_for(comm.write(local_info), time_left())
  File "/share/apps/python/miniconda3.8/lib/python3.8/asyncio/tasks.py", line 464, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 2334, in gather_dep
    response = await get_data_from_worker(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3753, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3730, in _get_data
    comm = await rpc.connect(worker)
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/core.py", line 1012, in connect
    comm = await connect(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 325, in connect
    raise IOError(
OSError: Timed out during handshake while connecting to tcp://123.123.123.123:41076 after 10 s

根据记录的 IP 和端口,我知道这些是工作人员之间的连接错误,而不是调度程序的工作人员。

堆栈跟踪不包含对我的代码的任何引用(而是对我的 venv 中的 Dask)的引用,但我确实怀疑我在 Dask 中所做的某些事情导致了这个问题。较早的任务,自定义延迟和 DataFrame 函数均已成功完成。

根据在系统出现故障时查看日志和仪表板的情况,我怀疑我在 groupby 聚合中遇到了问题。我已经解决了由数据混洗引起的内存不足问题,方法是对数据进行分区,使分组数据包含在每个分区中(不需要混洗)。我已确认在仪表板中没有看到任何随机播放任务 运行。尽管记录的堆栈跟踪仍然显示一个 Dask worker 试图使用 gather_dep 函数从另一个中检索数据。

我试图更好地理解员工之间相互交流以评估我可能正在做什么导致问题的情况?

逐渐增加超时配置只会使超时时间更长。工人们最终还是在连接彼此通信时死亡。

关于如何调试问题还有其他建议吗?

此页面有助于描述 Dask groupby 聚合过程:https://saturncloud.io/docs/reference/dask_groupby_aggregations/

特别是,如果您的分区已经根据 groupby 列排序,您可以使用 map_partitions() 单独处理每个分区。请注意,分区需要足够小才能存储在内存中。我不清楚是不是整个 DataFrame 都需要在内存中,或者 Dask 是否足够小心,只需要单独在内存中的每个分区。 https://saturncloud.io/docs/reference/dask_groupby_aggregations/#use-map_partitions-instead

我已经能够使用 map_partitions 来解决我之前遇到的工作人员之间的通信和超时问题。这并不能解释为什么当 DataFrame 已经在其中一个 groupby 列上分区时 Dask 试图与其他工作人员通信,但如果您发现自己处于类似情况,它至少会让您前进。