使用 pyarrow 从分区镶木地板数据集中读取特定分区

Reading specific partitions from a partitioned parquet dataset with pyarrow

我有一个 parquet 格式的分区数据集(约 20 GB)。我想使用 pyarrow 从数据集中读取特定分区。我以为我可以用 pyarrow.parquet.ParquetDataset 来完成这个,但事实似乎并非如此。这是一个小例子来说明我想要什么。

创建随机数据集:

from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset


def get_partitions(basepath, partitions):
    """Generate directory hierarchy for a paritioned dataset

    data
    ├── part1=foo
    │   └── part2=True
    ├── part1=foo
    │   └── part2=False
    ├── part1=bar
    │   └── part2=True
    └── part1=bar
        └── part2=False

    """
    path_tmpl = '/'.join(['{}={}'] * len(partitions))  # part=value
    path_tmpl = '{}/{}'.format(basepath, path_tmpl)    # part1=val/part2=val

    parts = [product([part], vals) for part, vals in partitions.items()]
    parts = [i for i in product(*parts)]
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
    # 3 columns, 5 rows
    data = [pa.array(np.random.rand(5)) for i in range(3)]
    table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
    os.makedirs(part, exist_ok=True)
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
                        table.schema, flavor='spark')
    out.write_table(table)
    out.close()

我想读取分区 1 的所有值,并且只读取分区 2 的 True。使用 pandas.read_parquet,这是不可能的,我必须始终读取整个列。我用 pyarrow 尝试了以下操作:

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()

这也不行:

>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')

我可以像这样在 pyspark 中轻松做到这一点:

def get_spark_session_ctx(appName):
    """Get or create a Spark Session, and the underlying Context."""
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(appName).getOrCreate()
    sc = spark.sparkContext
    return (spark, sc)


spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()

如下图所示:

>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')

这可以用 pyarrowpandas 完成,还是我需要一些自定义实现?

更新: 应 Wes 的要求,现在 JIRA

问题:如何使用 pyarrow 从分区的镶木地板数据集中读取特定分区?

回答:你现在不能。

您能否在 https://issues.apache.org/jira 上创建请求此功能的 Apache Arrow JIRA?

这是我们应该能够在 pyarrow API 中支持的东西,但它需要有人来实现它。谢谢

从 pyarrow 版本 0.10.0 开始,您可以使用 filters kwarg 进行查询。在您的情况下,它看起来像这样:

import pyarrow.parquet as pq
dataset = pq.ParquetDataset('path-to-your-dataset', filters=[('part2', '=', 'True'),])
table = dataset.read()

Ref