为什么 Dask 似乎低效地存储 Parquet

Why does Dask seem to store Parquet inefficiently

当我使用 Pandas 和 Dask 将相同的 table 保存到 Parquet 中时,Pandas 创建了一个 4k 文件,而 Dask 创建了一个 39M 文件.

创建数据框

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd

n = int(1e7)
df = pd.DataFrame({'col': ['a'*64]*n})

以不同的方式保存

# Pandas: 4k
df.to_parquet('example-pandas.parquet')

# PyArrow: 4k
pq.write_table(pa.Table.from_pandas(df), 'example-pyarrow.parquet')

# Dask: 39M
dd.from_pandas(df, npartitions=1).to_parquet('example-dask.parquet', compression='snappy')

起初我以为 Dask 没有使用字典和 运行-length 编码,但事实似乎并非如此。我不确定我是否正确解释了元数据信息,但至少,它似乎完全相同:

>>> pq.read_metadata('example-pandas.parquet').row_group(0).column(0)
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fbee7d1a770>
  file_offset: 548
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 10000000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fbee7d2cc70>
      has_min_max: True
      min: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
      max: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
      null_count: 0
      distinct_count: 0
      num_values: 10000000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 29
  total_compressed_size: 544
  total_uncompressed_size: 596

>>> pq.read_metadata('example-dask.parquet/part.0.parquet').row_group(0).column(0)
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fbee7d3d180>
  file_offset: 548
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 10000000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fbee7d3d1d0>
      has_min_max: True
      min: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
      max: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
      null_count: 0
      distinct_count: 0
      num_values: 10000000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 29
  total_compressed_size: 544
  total_uncompressed_size: 596

为什么 Dask-create Parquet 这么大?或者,如何进一步检查可能存在的问题?

Dask 似乎正在保存一个 int64 索引...

>>> meta.row_group(0).column(1)
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fa41e1babd0>
  file_offset: 40308181
  file_path: 
  physical_type: INT64
  num_values: 10000000
  path_in_schema: __null_dask_index__
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fa41e1badb0>
      has_min_max: True
      min: 0
      max: 9999999
      null_count: 0
      distinct_count: 0
      num_values: 10000000
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
  has_dictionary_page: True
  dictionary_page_offset: 736
  data_page_offset: 525333
  total_compressed_size: 40307445
  total_uncompressed_size: 80284661

您可以使用 write_index 禁用此功能:

dd.from_pandas(df, npartitions=1).to_parquet('example-dask.parquet', compression='snappy', write_index=False)

Pyarrow 不会生成任何索引。

Pandas 确实会生成一个索引,但至少在使用箭头引擎时,简单的线性索引将被保存为元数据而不是实际的列。

>>> table = pq.read_table('example-pandas.parquet')
>>> pandas_meta = json.loads(table.schema.metadata[b'pandas'])
>>> pandas_meta['index_columns'][0]
{'kind': 'range', 'name': None, 'start': 0, 'stop': 10000000, 'step': 1}