map_partitions 在 parquet 中存储 dask 数据帧时运行两次并计算记录

map_partitions runs twice when storing dask dataframe in parquet and records are counted

我有一个 dask 进程,运行在每个数据帧分区上都有一个函数。我让 to_parquetcompute() 运行 的功能。

但我还需要知道 parquet table 中的记录数。为此,我使用 ddf.map_partitions(len)。问题是,当我计算记录数时,compute() 在数据帧上再次完成,这使得 map_partitions 函数再次 运行。

运行map_partitions应该如何处理,将结果保存在parquet中,统计记录条数?

def some_func(df):
    df['abc'] = df['def'] * 10
    return df

client = Client('127.0.0.1:8786')

ddf.map_partitions(some_func) # some_func executes twice for each partition

ddf.to_parquet('/some/folder/data', engine='pyarrow') 

total = ddf.map_partitions(len).compute().sum() 

一个潜在的问题是行:

ddf.map_partitions(some_func)

这里dask的指令是映射分区,但是没有指令存储这个操作的结果。因此,代码可能应该是:

# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)

接下来,运行 .to_parquet 将评估(计算)数据帧 ddf 并将其从内存中丢弃,因此随后的 .compute 将 re-compute数据框。

这可能很昂贵,因此一些可能的解决方案是:

  1. 如果ddf可以放入workers的内存中,那么ddf可以被持久化以避免重新计算:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow') 
total = ddf.map_partitions(len).compute().sum()
  1. re-defining ddf 在计算数据方面。这里,dask 将使用存储的镶木地板文件(已计算)并从中加载信息:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')

# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
  1. 另一个解决方案可能是修改 some_func 以动态存储结果和 return len。粗略的伪代码是:
path_out = 'some/template_path/for/parquet/{number}.parquet'

def some_func(df, partition_info=None):
    df['abc'] = df['def'] * 10
    # this uses partition number to create an appropriate path
    path_save_parquet = path_out.format(number=partition_info['number'])
    df.to_parquet(path_save_parquet)
    return len(df)

# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()