使用谓词过滤来自 pyarrow.parquet.ParquetDataset 的行
Using predicates to filter rows from pyarrow.parquet.ParquetDataset
我有一个 parquet 数据集存储在 s3 上,我想查询数据集中的特定行。我能够使用 petastorm
做到这一点,但现在我只想使用 pyarrow
.
这是我的尝试:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
'analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.read_pandas().to_pandas()
但是那个 returns 一个 pandas DataFrame 就好像过滤器不起作用一样,即我有具有各种值 event_name
的行。有什么我想念的或我误解的东西吗?我可以在获得 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存 space。
目前,filters
功能仅在文件级别实现,尚未在行级别实现。
因此,如果您的数据集是嵌套层次结构中多个分区 parquet 文件的集合(此处描述的分区数据集类型:https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files),您可以使用 filters
参数只读取文件的一个子集。
但是,您还不能使用它来仅读取单个文件的行组的子集(参见 https://issues.apache.org/jira/browse/ARROW-1796)。
但是,如果您收到指定此类无效过滤器的错误消息,那就太好了。我为此开了一个问题:https://issues.apache.org/jira/browse/ARROW-5572
对于 python 3.6+ AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成,它允许您过滤分区的 S3 密钥。
安装做;
pip install awswrangler
为了减少您读取的数据,您可以根据存储在 s3 上的镶木地板文件中的分区列过滤行。
要过滤分区列 event_name
中值为 "SomeEvent"
的行,请执行;
对于 awswrangler < 1.0.0
import awswrangler as wr
df = wr.pandas.read_parquet(
path="s3://my-bucket/my/path/to/parquet-file.parquet",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
对于 awswrangler > 1.0.0 做;
import awswrangler as wr
df = wr.s3.read_parquet(
path="s3://my-bucket/my/path/to/parquet-file.parquet",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
对于从 Google 来到这里的任何人,您现在可以在读取 Parquet 文件时过滤 PyArrow 中的行。无论您是通过 pandas 还是 pyarrow.parquet.
阅读它
filters (List[Tuple] or List[List[Tuple]] or None (default)) –
Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.
Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).
Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.
注意:我已将其扩展为 Python 和 this post
中的 Parquet 的综合指南
Parquet 格式分区
为了使用过滤器,您需要使用分区以 Parquet 格式存储数据。从许多列和分区中加载一些 Parquet 列和分区可以显着提高 I/O Parquet 与 CSV 的性能。 Parquet 可以根据一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建一个目录树,或者为一个分区列创建一组目录。 PySpark Parquet documentation 解释了 Parquet 的工作原理。
关于性别和国家的分区看起来像 this:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
如果你需要进一步分区你的数据,也有行组分区,但大多数工具只支持指定行组大小,你必须自己做 key-->row group
查找,这很难看(乐于回答关于另一个问题)。
用Pandas
写分区
您需要使用 Parquet 对数据进行分区,然后您可以使用过滤器加载它。您可以使用 PyArrow 将数据写入分区,pandas 或 Dask or PySpark 用于大型数据集。
例如在pandas中写入分区:
df.to_parquet(
path='analytics.xxx',
engine='pyarrow',
compression='snappy',
columns=['col1', 'col5'],
partition_cols=['event_name', 'event_category']
)
文件布局如下:
analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
在 PyArrow 中加载 Parquet 分区
要使用分区列按一个 属性 抓取事件,请将元组过滤器放入列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑与筛选
要使用 AND 获取具有两个或更多属性的事件,您只需创建一个过滤器元组列表:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑 OR 过滤
要使用 OR 获取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用 AWS Data Wrangler 加载 Parquet 分区
正如提到的另一个答案,无论数据位于何处(本地或云中),将数据过滤加载到某些分区中的某些列的最简单方法是使用 awswrangler
module. If you're using S3, check out the documentation for awswrangler.s3.read_parquet()
and awswrangler.s3.to_parquet()
。过滤与上面的示例相同。
import awswrangler as wr
df = wr.s3.read_parquet(
path="analytics.xxx",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
使用 pyarrow.parquet.read_table()
加载 Parquet 分区
如果你使用 PyArrow,你也可以使用 pyarrow.parquet.read_table()
:
import pyarrow.parquet as pq
fp = pq.read_table(
source='analytics.xxx',
use_threads=True,
columns=['some_event', 'some_category'],
filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()
使用 PySpark 加载 Parquet 分区
最后,在 PySpark 中您可以使用 pyspark.sql.DataFrameReader.read_parquet()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('Stack Overflow Example Parquet Column Load') \
.getOrCreate()
# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
.select('event_name', 'event_category') \
.filter(F.col('event_name') == 'SomeEvent')
希望这对您使用 Parquet 有所帮助:)
我有一个 parquet 数据集存储在 s3 上,我想查询数据集中的特定行。我能够使用 petastorm
做到这一点,但现在我只想使用 pyarrow
.
这是我的尝试:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
'analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.read_pandas().to_pandas()
但是那个 returns 一个 pandas DataFrame 就好像过滤器不起作用一样,即我有具有各种值 event_name
的行。有什么我想念的或我误解的东西吗?我可以在获得 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存 space。
目前,filters
功能仅在文件级别实现,尚未在行级别实现。
因此,如果您的数据集是嵌套层次结构中多个分区 parquet 文件的集合(此处描述的分区数据集类型:https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files),您可以使用 filters
参数只读取文件的一个子集。
但是,您还不能使用它来仅读取单个文件的行组的子集(参见 https://issues.apache.org/jira/browse/ARROW-1796)。
但是,如果您收到指定此类无效过滤器的错误消息,那就太好了。我为此开了一个问题:https://issues.apache.org/jira/browse/ARROW-5572
对于 python 3.6+ AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成,它允许您过滤分区的 S3 密钥。
安装做;
pip install awswrangler
为了减少您读取的数据,您可以根据存储在 s3 上的镶木地板文件中的分区列过滤行。
要过滤分区列 event_name
中值为 "SomeEvent"
的行,请执行;
对于 awswrangler < 1.0.0
import awswrangler as wr
df = wr.pandas.read_parquet(
path="s3://my-bucket/my/path/to/parquet-file.parquet",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
对于 awswrangler > 1.0.0 做;
import awswrangler as wr
df = wr.s3.read_parquet(
path="s3://my-bucket/my/path/to/parquet-file.parquet",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
对于从 Google 来到这里的任何人,您现在可以在读取 Parquet 文件时过滤 PyArrow 中的行。无论您是通过 pandas 还是 pyarrow.parquet.
阅读它filters (List[Tuple] or List[List[Tuple]] or None (default)) – Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.
Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).
Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.
注意:我已将其扩展为 Python 和 this post
中的 Parquet 的综合指南Parquet 格式分区
为了使用过滤器,您需要使用分区以 Parquet 格式存储数据。从许多列和分区中加载一些 Parquet 列和分区可以显着提高 I/O Parquet 与 CSV 的性能。 Parquet 可以根据一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建一个目录树,或者为一个分区列创建一组目录。 PySpark Parquet documentation 解释了 Parquet 的工作原理。
关于性别和国家的分区看起来像 this:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
如果你需要进一步分区你的数据,也有行组分区,但大多数工具只支持指定行组大小,你必须自己做 key-->row group
查找,这很难看(乐于回答关于另一个问题)。
用Pandas
写分区您需要使用 Parquet 对数据进行分区,然后您可以使用过滤器加载它。您可以使用 PyArrow 将数据写入分区,pandas 或 Dask or PySpark 用于大型数据集。
例如在pandas中写入分区:
df.to_parquet(
path='analytics.xxx',
engine='pyarrow',
compression='snappy',
columns=['col1', 'col5'],
partition_cols=['event_name', 'event_category']
)
文件布局如下:
analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
在 PyArrow 中加载 Parquet 分区
要使用分区列按一个 属性 抓取事件,请将元组过滤器放入列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑与筛选
要使用 AND 获取具有两个或更多属性的事件,您只需创建一个过滤器元组列表:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑 OR 过滤
要使用 OR 获取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用 AWS Data Wrangler 加载 Parquet 分区
正如提到的另一个答案,无论数据位于何处(本地或云中),将数据过滤加载到某些分区中的某些列的最简单方法是使用 awswrangler
module. If you're using S3, check out the documentation for awswrangler.s3.read_parquet()
and awswrangler.s3.to_parquet()
。过滤与上面的示例相同。
import awswrangler as wr
df = wr.s3.read_parquet(
path="analytics.xxx",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
使用 pyarrow.parquet.read_table()
加载 Parquet 分区
如果你使用 PyArrow,你也可以使用 pyarrow.parquet.read_table()
:
import pyarrow.parquet as pq
fp = pq.read_table(
source='analytics.xxx',
use_threads=True,
columns=['some_event', 'some_category'],
filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()
使用 PySpark 加载 Parquet 分区
最后,在 PySpark 中您可以使用 pyspark.sql.DataFrameReader.read_parquet()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('Stack Overflow Example Parquet Column Load') \
.getOrCreate()
# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
.select('event_name', 'event_category') \
.filter(F.col('event_name') == 'SomeEvent')
希望这对您使用 Parquet 有所帮助:)