map_partitions 在 parquet 中存储 dask 数据帧时运行两次并计算记录
map_partitions runs twice when storing dask dataframe in parquet and records are counted
我有一个 dask 进程,运行在每个数据帧分区上都有一个函数。我让 to_parquet
做
compute()
运行 的功能。
但我还需要知道 parquet
table 中的记录数。为此,我使用 ddf.map_partitions(len)
。问题是,当我计算记录数时,compute()
在数据帧上再次完成,这使得 map_partitions 函数再次 运行。
运行map_partitions应该如何处理,将结果保存在parquet中,统计记录条数?
def some_func(df):
df['abc'] = df['def'] * 10
return df
client = Client('127.0.0.1:8786')
ddf.map_partitions(some_func) # some_func executes twice for each partition
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
一个潜在的问题是行:
ddf.map_partitions(some_func)
这里dask的指令是映射分区,但是没有指令存储这个操作的结果。因此,代码可能应该是:
# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)
接下来,运行 .to_parquet
将评估(计算)数据帧 ddf
并将其从内存中丢弃,因此随后的 .compute
将 re-compute数据框。
这可能很昂贵,因此一些可能的解决方案是:
- 如果
ddf
可以放入workers的内存中,那么ddf
可以被持久化以避免重新计算:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
- re-defining
ddf
在计算数据方面。这里,dask
将使用存储的镶木地板文件(已计算)并从中加载信息:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
- 另一个解决方案可能是修改
some_func
以动态存储结果和 return len
。粗略的伪代码是:
path_out = 'some/template_path/for/parquet/{number}.parquet'
def some_func(df, partition_info=None):
df['abc'] = df['def'] * 10
# this uses partition number to create an appropriate path
path_save_parquet = path_out.format(number=partition_info['number'])
df.to_parquet(path_save_parquet)
return len(df)
# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()
我有一个 dask 进程,运行在每个数据帧分区上都有一个函数。我让 to_parquet
做
compute()
运行 的功能。
但我还需要知道 parquet
table 中的记录数。为此,我使用 ddf.map_partitions(len)
。问题是,当我计算记录数时,compute()
在数据帧上再次完成,这使得 map_partitions 函数再次 运行。
运行map_partitions应该如何处理,将结果保存在parquet中,统计记录条数?
def some_func(df):
df['abc'] = df['def'] * 10
return df
client = Client('127.0.0.1:8786')
ddf.map_partitions(some_func) # some_func executes twice for each partition
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
一个潜在的问题是行:
ddf.map_partitions(some_func)
这里dask的指令是映射分区,但是没有指令存储这个操作的结果。因此,代码可能应该是:
# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)
接下来,运行 .to_parquet
将评估(计算)数据帧 ddf
并将其从内存中丢弃,因此随后的 .compute
将 re-compute数据框。
这可能很昂贵,因此一些可能的解决方案是:
- 如果
ddf
可以放入workers的内存中,那么ddf
可以被持久化以避免重新计算:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
- re-defining
ddf
在计算数据方面。这里,dask
将使用存储的镶木地板文件(已计算)并从中加载信息:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
- 另一个解决方案可能是修改
some_func
以动态存储结果和 returnlen
。粗略的伪代码是:
path_out = 'some/template_path/for/parquet/{number}.parquet'
def some_func(df, partition_info=None):
df['abc'] = df['def'] * 10
# this uses partition number to create an appropriate path
path_save_parquet = path_out.format(number=partition_info['number'])
df.to_parquet(path_save_parquet)
return len(df)
# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()