为什么无论数据帧的大小如何,dask 都需要很长时间来计算

Why does dask take long time to compute regardless of the size of dataframe

无论数据帧大小如何,dask 数据帧都需要很长时间来计算的原因是什么。 如何避免这种情况发生?背后的原因是什么?

编辑:

我目前正在使用 ml.c5.2xlarge 实例类型处理 AWS Sagemaker,数据位于 S3 存储桶中。 我没有连接到客户端,因为我无法连接。当我 运行 客户端通过本地集群时出现此错误 --> AttributeError: MaterializedLayer' object has no attribute 'pack_annotations'

所以,我没有连接任何特定的东西,所以它现在处于默认状态。 (集群,工人:4,核心:8,内存:16.22 GB)

shape = df.shape
nrows = shape[0].compute()
print("nrows",nrows)
print(df.npartitions) 

我尝试对 24700000 条记录(~27M)执行计算,有 23 个分区,执行时间为 CPU 次:用户 4 分钟 48 秒,系统:12.9 秒,总计:5 分钟 1 秒 挂墙时间:4min 46s

对于nrows 5120000(~5M),有23个分区,执行耗时为CPU次:user 4min 50s,sys:12s,total:5min 2s 挂墙时间:4min 46s

对于nrows 7697351 (~7M) with 1 partition, 花费的时间是 CPU 次:用户 5 分钟 4 秒,系统:10.6 秒,总计:5 分钟 14 秒 挂墙时间:4分52秒

我在 Pandas 中用 7690000 (~7M) 执行了相同的操作,执行时间是 CPU 次:用户 502 微秒,系统:0 纳秒,总计:502 微秒 挂墙时间:402 微秒 对于上述所有情况,列数仍为 5

我只是想找出数据的形状,但在 Dask 中,无论操作类型如何,dask 都需要相同的时间来执行一个计算操作。

请问这背后的原因是什么,我需要做什么来避免这种情况并优化计算时间

通常,给定的计算将包含可以分布式(并行化)的部分和必须按顺序完成的部分,请参阅 Amdahl's law。如果给定的算法具有较大的串行分量,那么 distributing/scaling 的收益将会很小。

在不知道任务图的细节的情况下,很难说出到底是什么导致了瓶颈,但更广泛地说,即使输入相对较小,也可能有几个导致性能缓慢的原因:

  • 生成大量分区或涉及跨多个分区通信的计算(例如排序、一致编码分类列);
  • 慢速 IO(例如,如果数据帧必须通过使用慢速网络连接轮询远程数据库来构建);
  • 涉及具有固定构建成本的模拟的计算(例如,如果数据框是边的列表,应该对照必须为多个种子构建的特定随机图进行检查)。

上面列出的原因仍会随着数据的变化而变化,因此这并不是您问题的确切答案(“无论数据帧的大小如何”),但可能会有所帮助。

要解决(或避免)这个问题,通常必须检查 algorithm/code,确定性能瓶颈,弄清楚它们是否可以并行化,或者它们是否本质上是串行的。

dask 数据帧花费更多时间来计算(形状或任何操作)的原因是因为当调用计算操作时,dask 尝试执行从当前数据帧或其祖先的创建到计算点的操作() 被调用。

在问题中出现的场景中,dask 正在尝试从 S3 存储桶读取数据(从 s3 存储桶读取数据需要相当长的时间)。因此,当调用计算(以查找形状或任何其他操作)时,dask 会尝试通过从 s3 读取 csv 数据文件来执行所有操作,这会增加执行时间。

compute() 应尽量少用,但如果任何形式的计算操作都必须在当前或其子数据帧上反复执行,持久化数据帧会有所帮助。 persist() 允许数据存储在分布式内存中,因此它不会执行其祖先的所有操作,但它会从数据持久化的地方执行。