Dask 不从简单(非 Hive)Parquet 文件中恢复分区
Dask not recovering partitions from simple (non-Hive) Parquet files
我有一个关于 Dask+Parquet 的两部分问题。我正在尝试 运行 查询从分区的 Parquet 文件创建的 dask 数据帧,如下所示:
import pandas as pd
import dask.dataframe as dd
import fastparquet
##### Generate random data to Simulate Process creating a Parquet file ######
test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')
# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])
##### Write to partitioned parquet file, hive and simple #####
fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet', data=test_df, partition_on=['name'], file_scheme='hive')
# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!
我的目标是能够使用 dask 并行快速过滤和处理单个分区,如下所示:
df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>) # operate on each partition
我可以使用 Hive 风格的 Parquet 目录,但我注意到与直接从单个 parquet 文件读取相比,它的操作时间要长得多。
有人能告诉我实现这个的惯用方法吗? Dask/Parquet 还很陌生,如果这是一种混淆的方法,我们深表歉意。
也许从文档字符串中看不清楚,但是 "simple" 文件类型根本不会发生按值分区,这就是为什么它只有一个分区。
至于速度,当数据如此之小时,在单个函数调用中读取数据是最快的 - 特别是如果您打算执行 nunique
之类的任何操作,这将需要来自不同的值的组合分区。
在 Dask 中,每个任务都会产生开销,因此除非调用完成的工作量与开销相比很大,否则你可能会失败。此外,磁盘访问通常不是可并行化的,如果它们持有 GIL,某些计算部分可能无法 运行 在线程中并行。 最后,分区版本包含更多要解析的镶木地板元数据。
>>> len(dd.read_parquet('test_hive.parquet').name.nunique())
12
>>> len(dd.read_parquet('test_simple.parquet').name.nunique())
6
TL;DR: 确保你的分区足够大以保持 dask 忙碌。
(注意:一组唯一值已经从 parquet 元数据中显而易见,根本不需要加载数据;但 Dask 不知道如何进行此优化,毕竟,一些分区可能包含零行)
我有一个关于 Dask+Parquet 的两部分问题。我正在尝试 运行 查询从分区的 Parquet 文件创建的 dask 数据帧,如下所示:
import pandas as pd
import dask.dataframe as dd
import fastparquet
##### Generate random data to Simulate Process creating a Parquet file ######
test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')
# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])
##### Write to partitioned parquet file, hive and simple #####
fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet', data=test_df, partition_on=['name'], file_scheme='hive')
# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!
我的目标是能够使用 dask 并行快速过滤和处理单个分区,如下所示:
df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>) # operate on each partition
我可以使用 Hive 风格的 Parquet 目录,但我注意到与直接从单个 parquet 文件读取相比,它的操作时间要长得多。
有人能告诉我实现这个的惯用方法吗? Dask/Parquet 还很陌生,如果这是一种混淆的方法,我们深表歉意。
也许从文档字符串中看不清楚,但是 "simple" 文件类型根本不会发生按值分区,这就是为什么它只有一个分区。
至于速度,当数据如此之小时,在单个函数调用中读取数据是最快的 - 特别是如果您打算执行 nunique
之类的任何操作,这将需要来自不同的值的组合分区。
在 Dask 中,每个任务都会产生开销,因此除非调用完成的工作量与开销相比很大,否则你可能会失败。此外,磁盘访问通常不是可并行化的,如果它们持有 GIL,某些计算部分可能无法 运行 在线程中并行。 最后,分区版本包含更多要解析的镶木地板元数据。
>>> len(dd.read_parquet('test_hive.parquet').name.nunique())
12
>>> len(dd.read_parquet('test_simple.parquet').name.nunique())
6
TL;DR: 确保你的分区足够大以保持 dask 忙碌。
(注意:一组唯一值已经从 parquet 元数据中显而易见,根本不需要加载数据;但 Dask 不知道如何进行此优化,毕竟,一些分区可能包含零行)