读/写 Parquet 文件而不读入内存(使用 Python)

Read / Write Parquet files without reading into memory (using Python)


我查看了我希望满足我的需求的标准文档 (Apache Arrow and Pandas),但我似乎无法理解。

我最了解Python,所以我想使用Python,但这不是严格要求。

问题

我需要将 Parquet 文件从一个位置 (URL) 移动到另一个位置(一个 Azure 存储帐户,在本例中使用 Azure 机器学习平台,但这与我的问题无关)。

这些文件太大,无法简单地执行 pd.read_parquet("https://my-file-location.parquet"),因为这会将整个文件读入一个对象。

预期

我认为必须有一种简单 的方法来创建文件对象并逐行流式传输该对象——或者逐列逐块地传输。像

import pyarrow.parquet as pq

with pq.open("https://my-file-location.parquet") as read_file_handle:
    with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
        for next_line in read_file_handle{
            write_file_handle.append(next_line)

我知道它会有点不同,因为 Parquet 主要是为了以柱状方式访问。也许我会传递某种配置对象,它指定感兴趣的列,或者可能在一个块或类似的东西中可以抓取多少行。

但关键的期望是 有一种方法可以访问 parquet 文件而无需将其全部加载到内存中。我该怎么做?

FWIW,我确实尝试只使用 Python 的标准 open 函数,但我不确定如何将 open 与 URL 位置和一个字节流。如果可以仅通过 open 并跳过任何特定于 Parquet 的内容来执行此操作,那也很好。

更新

一些评论建议使用类似 bash 的脚本,例如 here。如果没有别的,我可以使用这个,但它并不理想,因为:

这是可能的,但需要一些工作,因为除了柱状 Parquet 还需要一个模式。

粗略的工作流程是:

  1. 打开一个parquet file阅读。

  2. 然后使用iter_batches增量读回行块(您也可以传递要从文件中读取的特定列以保存IO/CPU)。

  3. 然后您可以进一步转换 iter_batches 中的每个 pa.RecordBatch。完成第一批转换后,您可以获得它的 schema and create a new ParquetWriter.

  4. 对于每个转换后的批处理调用 write_table. You have to first convert it to a pa.Table

  5. 关闭文件。

Parquet 需要随机访问,因此它不能从 URI 轻松流式传输(如果您通过 HTTP FSSpec 打开文件,pyarrow 应该支持它)但我认为您可能会在写入时被阻止。

请注意,我没有指定如何在远程服务器端使用批处理的实现。
我的解决方案是:使用 pyarrow.NativeFile 将批次写入缓冲区,然后使用 pyarrow.ipc.RecordBatchFileReader

读取缓冲区

我创建了这个 2 类 来帮助您进行流媒体处理

import asyncio
from pyarrow.parquet import ParquetFile


class ParquetFileStreamer:
    """
    Attributes:
        ip_address: ip address of the distant server
        port: listening port of the distant server
        n_bytes: -1 means read whole batch
        file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
        batch_size: default = 65536
        columns: list of the columns you wish to select (if None selects all)

    Example:
        >>> pfs = ParquetFileStreamer
        >>> class MyStreamer(ParquetFileStreamer)
                file_source = '/usr/fromage/camembert.parquet
                columns = ['name', 'price']
        >>> MyStreamer.start_stream()
    """
    ip_address = '192.168.1.1'
    port = 80
    n_bytes = -1

    file_source: str
    batch_size = 65536
    columns = []

    @classmethod
    def start_stream(cls):
        for batch in cls._open_parquet():
            asyncio.run(cls._stream_parquet(batch))

    @classmethod
    def _open_parquet(cls):
        return ParquetFile(cls.file_source).iter_batches(
            batch_size=cls.batch_size,
            columns=cls.columns
        )

    @classmethod
    async def _stream_parquet(cls, batch):
        reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
        writer.write(batch)
        await writer.drain()
        await reader.read()
        writer.close()
        await writer.wait_closed()


class ParquetFileReceiver:
    """
    Attributes: \n
        port: specify the port \n
        n_bytes: -1 reads all the batch
    Example:
        >>> pfr = ParquetFileReceiver
        >>> asyncio.run(pfr.server())
    """
    port = 80
    n_bytes = -1

    @classmethod
    async def handle_stream(cls, reader, writer):
        data = await reader.read(cls.n_bytes)
        batch = data.decode()
        print(batch)

    @classmethod
    async def server(cls):
        server = await asyncio.start_server(cls.handle_stream, port=cls.port)
        async with server:
            await server.serve_forever()

太棒了 post,根据@Micah 的回答,我将我的 2 美分放入其中,以防您不想阅读文档。一个小片段如下:

import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile

# create a random df then save to parquet
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})
df.to_parquet('./test/test')

# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
    batch_size=10,
    columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict