Dask 在 groupby 期间进行了超出必要的计算
Dask computes more than necessary during groupby
在我看来,Dask 在使用 DataFrame.groupby 时做了比必要更多的工作。
如果我设置了一些数据并且只想要结果操作的一个子集,
import dask
df = dask.datasets.timeseries(partition_freq='2W').set_index('id').persist()
task = df.groupby('id')['x'].sum().head(compute=False)
Dask 似乎想使用所有分区进行计算:
task.visualize(optimize_graph=True, size='8')
Groupby 自动将数据帧折叠成一个分区,我想 head 不能在这方面追溯更改图表。
我希望也许 split_out
可以帮助解决这个问题,但事实并非如此。
task = df.groupby('id')['x'].sum(split_out=2).head(compute=False)
task.visualize(optimize_graph=True, size='8')
因此,似乎在这种特殊情况下,Dask 做了(可能显着)多于必要的工作。
问题
因此我的问题是
- 我对情况的评估是否正确? Dask 确实做了更多的工作吗?如果不是,我怎么知道?
- 如果我是对的,我该怎么做才能告诉 Dask 只计算需要的内容?
我的实际问题当然要复杂得多,但我希望这些玩具示例能帮助阐明我的问题。
看来 map_partitions
是你的朋友,即使是 groupby-with-index 操作!
import dask
# Warning: might be too big, scale according to your machine
df = (dask.datasets.timeseries(end='2002-01-30', partition_freq='2W')
.set_index('id')
.persist())
task_a = df.groupby('id')['x'].sum().head(compute=False)
%time task_a.compute()
CPU times: user 22.5 ms, sys: 843 µs, total: 23.3 ms
Wall time: 161 ms
task_b = df.map_partitions(lambda x: x.groupby('id')['x'].sum()).head(compute=False)
%time task_b.compute()
CPU times: user 11.6 ms, sys: 0 ns, total: 11.6 ms
Wall time: 19.6 ms
得到这个更合理的图表(仅显示具有 2 个分区的原始数据)
在我看来,Dask 在使用 DataFrame.groupby 时做了比必要更多的工作。
如果我设置了一些数据并且只想要结果操作的一个子集,
import dask
df = dask.datasets.timeseries(partition_freq='2W').set_index('id').persist()
task = df.groupby('id')['x'].sum().head(compute=False)
Dask 似乎想使用所有分区进行计算:
task.visualize(optimize_graph=True, size='8')
Groupby 自动将数据帧折叠成一个分区,我想 head 不能在这方面追溯更改图表。
我希望也许 split_out
可以帮助解决这个问题,但事实并非如此。
task = df.groupby('id')['x'].sum(split_out=2).head(compute=False)
task.visualize(optimize_graph=True, size='8')
因此,似乎在这种特殊情况下,Dask 做了(可能显着)多于必要的工作。
问题
因此我的问题是
- 我对情况的评估是否正确? Dask 确实做了更多的工作吗?如果不是,我怎么知道?
- 如果我是对的,我该怎么做才能告诉 Dask 只计算需要的内容?
我的实际问题当然要复杂得多,但我希望这些玩具示例能帮助阐明我的问题。
看来 map_partitions
是你的朋友,即使是 groupby-with-index 操作!
import dask
# Warning: might be too big, scale according to your machine
df = (dask.datasets.timeseries(end='2002-01-30', partition_freq='2W')
.set_index('id')
.persist())
task_a = df.groupby('id')['x'].sum().head(compute=False)
%time task_a.compute()
CPU times: user 22.5 ms, sys: 843 µs, total: 23.3 ms
Wall time: 161 ms
task_b = df.map_partitions(lambda x: x.groupby('id')['x'].sum()).head(compute=False)
%time task_b.compute()
CPU times: user 11.6 ms, sys: 0 ns, total: 11.6 ms
Wall time: 19.6 ms
得到这个更合理的图表(仅显示具有 2 个分区的原始数据)