有效划分 dask 数据帧的策略
Strategy for partitioning dask dataframes efficiently
Dask 的文档讨论了重新分区以减少开销 here。
然而,它们似乎表明您需要事先了解数据框的外观(即,将有预期数据的 1/100)。
有没有什么好的方法可以在不做假设的情况下合理地重新分区?目前我只是用 npartitions = ncores * magic_number
重新分区,并在需要时将 force 设置为 True
以扩展分区。这种适用于所有方法的方法有效但绝对不是最佳选择,因为我的数据集大小不同。
数据是时间序列数据,但不幸的是不是定期的,我过去使用过按时间频率重新分区,但这不是最优的,因为数据非常不规则(有时几分钟没有,然后几千秒)
在与 mrocklin 讨论后,一个不错的分区策略是以 df.memory_usage().sum().compute()
为指导的 100MB 分区大小。对于适合 RAM 的数据集,可以通过在相关点使用 df.persist()
来减轻这可能涉及的额外工作。
补充一下 Samantha Hughes 的回答:
memory_usage()
默认忽略对象 dtype 列的内存消耗。对于我最近处理的数据集,这导致低估了大约 10 倍的内存使用量。
除非您确定没有对象 dtype 列,否则我建议指定 deep=True
,即重新分区使用:
df.repartition(npartitions= 1+df.memory_usage(deep=True).sum().compute() // n )
其中 n
是以字节为单位的目标分区大小。加 1 可确保分区数始终大于 1(//
执行楼层划分)。
从 Dask 2.0.0 开始,您可以跟注 .repartition(partition_size="100MB")
。
此方法对分区大小执行考虑对象 (.memory_usage(deep=True)
) 的细分。它将加入较小的分区,或拆分已经变得太大的分区。
Dask's Documentation 也概述了用法。
我试图检查适合我的情况的最佳数字是多少。
我有 100Gb csv 文件,其中包含 250M 行和 25 列。
我在 8 核笔记本电脑上工作。
我 运行 在 1,5,30,1000 个分区上“描述”函数
df = df.repartition(npartitions=1)
a1=df['age'].describe().compute()
df = df.repartition(npartitions=5)
a2=df['age'].describe().compute()
df = df.repartition(npartitions=30)
a3=df['age'].describe().compute()
df = df.repartition(npartitions=100)
a4=df['age'].describe().compute()
关于速度:
5,30 > 大约 3 分钟
1, 1000 > 大约 9 分钟
但是...我发现当我使用多个分区时,中位数或百分位数等“排序”函数给出了错误的数字。
1 个分区给出正确的数字(我使用 pandas 和 dask 用小数据检查了它)
Dask 的文档讨论了重新分区以减少开销 here。
然而,它们似乎表明您需要事先了解数据框的外观(即,将有预期数据的 1/100)。
有没有什么好的方法可以在不做假设的情况下合理地重新分区?目前我只是用 npartitions = ncores * magic_number
重新分区,并在需要时将 force 设置为 True
以扩展分区。这种适用于所有方法的方法有效但绝对不是最佳选择,因为我的数据集大小不同。
数据是时间序列数据,但不幸的是不是定期的,我过去使用过按时间频率重新分区,但这不是最优的,因为数据非常不规则(有时几分钟没有,然后几千秒)
在与 mrocklin 讨论后,一个不错的分区策略是以 df.memory_usage().sum().compute()
为指导的 100MB 分区大小。对于适合 RAM 的数据集,可以通过在相关点使用 df.persist()
来减轻这可能涉及的额外工作。
补充一下 Samantha Hughes 的回答:
memory_usage()
默认忽略对象 dtype 列的内存消耗。对于我最近处理的数据集,这导致低估了大约 10 倍的内存使用量。
除非您确定没有对象 dtype 列,否则我建议指定 deep=True
,即重新分区使用:
df.repartition(npartitions= 1+df.memory_usage(deep=True).sum().compute() // n )
其中 n
是以字节为单位的目标分区大小。加 1 可确保分区数始终大于 1(//
执行楼层划分)。
从 Dask 2.0.0 开始,您可以跟注 .repartition(partition_size="100MB")
。
此方法对分区大小执行考虑对象 (.memory_usage(deep=True)
) 的细分。它将加入较小的分区,或拆分已经变得太大的分区。
Dask's Documentation 也概述了用法。
我试图检查适合我的情况的最佳数字是多少。 我有 100Gb csv 文件,其中包含 250M 行和 25 列。 我在 8 核笔记本电脑上工作。 我 运行 在 1,5,30,1000 个分区上“描述”函数
df = df.repartition(npartitions=1)
a1=df['age'].describe().compute()
df = df.repartition(npartitions=5)
a2=df['age'].describe().compute()
df = df.repartition(npartitions=30)
a3=df['age'].describe().compute()
df = df.repartition(npartitions=100)
a4=df['age'].describe().compute()
关于速度:
5,30 > 大约 3 分钟
1, 1000 > 大约 9 分钟
但是...我发现当我使用多个分区时,中位数或百分位数等“排序”函数给出了错误的数字。
1 个分区给出正确的数字(我使用 pandas 和 dask 用小数据检查了它)