Dask 仪表板中 100% 进度后,Dask DF 操作需要很长时间
Dask DF operation takes a long time after 100% progress in Dask dashboard
我正在使用 Jupyter 中的 Dask 处理大型 CSV(~60GB;~250M 行)。
加载 DF 后,我要做的第一件事是连接两个字符串列。我可以成功地做到这一点,但我注意到单元执行时间似乎并没有随着工作人员数量的增加而减少(我在具有 64 个逻辑内核的机器上尝试了 5、10 和 20)。如果有的话,每五个左右的工人似乎就会增加一分钟的执行时间。
与此同时,Dask 仪表板的进度条表明该任务与工人数量的比例很好。在 5 名工人的情况下,任务在大约 10-15 分钟内完成(根据仪表板)。在 20 名工人的情况下,流可视化表明任务在大约 3-5 分钟内完成。但是 cell 执行时间保持在 25 分钟左右,即在 5 名工人的情况下,cell 似乎会额外挂起 10-15 分钟。流结束后;在 20 名工人的情况下——再多 20-22 分钟,据我所知,没有工人 activity 的证据。
这是我的代码 运行:
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=20)
client = Client(cluster)
df = dd.read_csv('df_name.csv', dtype={'col1': 'object', 'col2': 'object'})
with ProgressBar():
df["col_merged"] = df["col3"]+df["col4"]
df = df.compute()
Python版本:3.9.1
Dask版本:2021.06.2
我错过了什么?这仅仅是让 Dask 协调多个工作人员的开销吗?
运行
df = df.compute()
将尝试将所有 250M 行加载到内存中。如果这对你的机器可行,你仍然会花费很多时间,因为每个工作人员都要发送他们的块,所以会有很多数据传输...
核心思想是只将减少的计算结果放入内存,并在此之前将工作量分配给工作人员。
要添加到@SultanOrazbayev 的回答中,在所有任务完成后需要时间的具体事情是将数据从工作人员复制到您的客户端进程到 assemble 单个内存数据帧你所要求的。这不是一个“任务”,因为所有的计算都已经发生了,并且不能很好地并行化,因为客户端是从工作线程中提取数据的单线程。
与上面的评论一样:如果你想实现并行性,你需要在 workers 中加载数据(dd.read_csv 这样做)并在 workers 中对它们进行操作以获得你的结果。你应该放在 .compute()
比较小的东西上。相反,如果您的数据首先轻松地进入内存,那么参与 dask 可能没有任何好处,只需使用 pandas.
我正在使用 Jupyter 中的 Dask 处理大型 CSV(~60GB;~250M 行)。
加载 DF 后,我要做的第一件事是连接两个字符串列。我可以成功地做到这一点,但我注意到单元执行时间似乎并没有随着工作人员数量的增加而减少(我在具有 64 个逻辑内核的机器上尝试了 5、10 和 20)。如果有的话,每五个左右的工人似乎就会增加一分钟的执行时间。
与此同时,Dask 仪表板的进度条表明该任务与工人数量的比例很好。在 5 名工人的情况下,任务在大约 10-15 分钟内完成(根据仪表板)。在 20 名工人的情况下,流可视化表明任务在大约 3-5 分钟内完成。但是 cell 执行时间保持在 25 分钟左右,即在 5 名工人的情况下,cell 似乎会额外挂起 10-15 分钟。流结束后;在 20 名工人的情况下——再多 20-22 分钟,据我所知,没有工人 activity 的证据。
这是我的代码 运行:
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=20)
client = Client(cluster)
df = dd.read_csv('df_name.csv', dtype={'col1': 'object', 'col2': 'object'})
with ProgressBar():
df["col_merged"] = df["col3"]+df["col4"]
df = df.compute()
Python版本:3.9.1 Dask版本:2021.06.2
我错过了什么?这仅仅是让 Dask 协调多个工作人员的开销吗?
运行
df = df.compute()
将尝试将所有 250M 行加载到内存中。如果这对你的机器可行,你仍然会花费很多时间,因为每个工作人员都要发送他们的块,所以会有很多数据传输...
核心思想是只将减少的计算结果放入内存,并在此之前将工作量分配给工作人员。
要添加到@SultanOrazbayev 的回答中,在所有任务完成后需要时间的具体事情是将数据从工作人员复制到您的客户端进程到 assemble 单个内存数据帧你所要求的。这不是一个“任务”,因为所有的计算都已经发生了,并且不能很好地并行化,因为客户端是从工作线程中提取数据的单线程。
与上面的评论一样:如果你想实现并行性,你需要在 workers 中加载数据(dd.read_csv 这样做)并在 workers 中对它们进行操作以获得你的结果。你应该放在 .compute()
比较小的东西上。相反,如果您的数据首先轻松地进入内存,那么参与 dask 可能没有任何好处,只需使用 pandas.