dask 计算存储结果吗?

does dask compute store results?

考虑以下代码

import dask
import dask.dataframe as dd
import pandas as pd

data_dict = {'data1':[1,2,3,4,5,6,7,8,9,10]}
df_pd     = pd.DataFrame(data_dict) 
df_dask   = dd.from_pandas(df_pd,npartitions=2)

df_dask['data1x2'] = df_dask['data1'].apply(lambda x:2*x,meta=('data1x2','int64')).compute()

print('-'*80)
print(df_dask['data1x2'])
print('-'*80)
print(df_dask['data1x2'].compute())
print('-'*80)

我想不通的是:为什么第一次和第二次打印的输出有差异?毕竟,我在应用函数时调用了计算并将结果存储在 df_dask['data1x2'].

第一次打印只会显示dask系列的lazy版本,df_dask["data1x2"]:

Dask Series Structure:
npartitions=2
0    int64
5      ...
9      ...
Name: data1x2, dtype: int64
Dask Name: getitem, 15 tasks

这显示了分区数、索引值(如果已知)、获得最终结果需要完成的任务数,以及一些其他信息。在这个阶段,dask 没有计算实际的系列,所以这个 dask 数组中的值是未知的。调用 .compute 启动计算获取实际值所需的 15 个任务,这就是第二次打印的内容。

Dask 确实将结果存储在工作程序或调度程序的内存中。但这并不是导致显示结果差异的原因。两者显示不同是因为它们是不同类型的对象。

df_dask['data1x2'] 是一个 dask.dataframe.Series,它只会显示数据结构的预览和有关计算值所涉及的任务数量的信息。显示任何数据至少需要将数据移动到主线程,如果不是计算并且可能 I/O,所以 dask 永远不会这样做,除非明确要求,例如df.head().

df_dask['data1x2'].compute() 是一个 pandas.Series。它不再与 dask 有任何关系,并且根据定义 in-memory。由于所有pandas个数据结构都在内存中,所以默认显示数据。

当你在一个 dask 对象上调用计算时,它不再是一个 dask 对象。在这种情况下,首先计算 returns 个 pandas 系列。当您将 pandas 系列分配给 dask 数据框时,dask 分区并将数据发送给 worker,然后无法再显示整个系列。所以如果你想看到显示的系列,你必须再次调用计算。

想象一下,如果您的整个数据框太大而无法放入内存,这会有多大用处,例如如果你有 1000 列和 1000 万行。这就是 dask 的设计目的。