读/写 Parquet 文件而不读入内存(使用 Python)
Read / Write Parquet files without reading into memory (using Python)
我查看了我希望满足我的需求的标准文档 (Apache Arrow and Pandas),但我似乎无法理解。
我最了解Python,所以我想使用Python,但这不是严格要求。
问题
我需要将 Parquet 文件从一个位置 (URL) 移动到另一个位置(一个 Azure 存储帐户,在本例中使用 Azure 机器学习平台,但这与我的问题无关)。
这些文件太大,无法简单地执行 pd.read_parquet("https://my-file-location.parquet")
,因为这会将整个文件读入一个对象。
预期
我认为必须有一种简单 的方法来创建文件对象并逐行流式传输该对象——或者逐列逐块地传输。像
import pyarrow.parquet as pq
with pq.open("https://my-file-location.parquet") as read_file_handle:
with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
for next_line in read_file_handle{
write_file_handle.append(next_line)
我知道它会有点不同,因为 Parquet 主要是为了以柱状方式访问。也许我会传递某种配置对象,它指定感兴趣的列,或者可能在一个块或类似的东西中可以抓取多少行。
但关键的期望是 有一种方法可以访问 parquet 文件而无需将其全部加载到内存中。我该怎么做?
FWIW,我确实尝试只使用 Python 的标准 open
函数,但我不确定如何将 open
与 URL 位置和一个字节流。如果可以仅通过 open
并跳过任何特定于 Parquet 的内容来执行此操作,那也很好。
更新
一些评论建议使用类似 bash 的脚本,例如 here。如果没有别的,我可以使用这个,但它并不理想,因为:
- 我宁愿将这一切都保存在一个完整的语言 SDK 中,无论是 Python、Go 还是其他语言。如果解决方案移动到带有管道的 bash 脚本中,则需要外部调用,因为最终解决方案不会完全用 bash、Powershell 或任何脚本语言编写。
- 我真的很想利用 Parquet 本身的一些好处。正如我在下面的评论中提到的,Parquet 是列式存储。因此,如果我有一个 11 亿行和 100 列的“数据框”,但我只关心 3 列,我希望能够只下载这 3 列,节省还有很多时间和一些钱。
这是可能的,但需要一些工作,因为除了柱状 Parquet 还需要一个模式。
粗略的工作流程是:
打开一个parquet file阅读。
然后使用iter_batches增量读回行块(您也可以传递要从文件中读取的特定列以保存IO/CPU)。
然后您可以进一步转换 iter_batches
中的每个 pa.RecordBatch
。完成第一批转换后,您可以获得它的 schema and create a new ParquetWriter.
对于每个转换后的批处理调用 write_table. You have to first convert it to a pa.Table
。
关闭文件。
Parquet 需要随机访问,因此它不能从 URI 轻松流式传输(如果您通过 HTTP FSSpec 打开文件,pyarrow 应该支持它)但我认为您可能会在写入时被阻止。
请注意,我没有指定如何在远程服务器端使用批处理的实现。
我的解决方案是:使用 pyarrow.NativeFile 将批次写入缓冲区,然后使用 pyarrow.ipc.RecordBatchFileReader
读取缓冲区
我创建了这个 2 类 来帮助您进行流媒体处理
import asyncio
from pyarrow.parquet import ParquetFile
class ParquetFileStreamer:
"""
Attributes:
ip_address: ip address of the distant server
port: listening port of the distant server
n_bytes: -1 means read whole batch
file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
batch_size: default = 65536
columns: list of the columns you wish to select (if None selects all)
Example:
>>> pfs = ParquetFileStreamer
>>> class MyStreamer(ParquetFileStreamer)
file_source = '/usr/fromage/camembert.parquet
columns = ['name', 'price']
>>> MyStreamer.start_stream()
"""
ip_address = '192.168.1.1'
port = 80
n_bytes = -1
file_source: str
batch_size = 65536
columns = []
@classmethod
def start_stream(cls):
for batch in cls._open_parquet():
asyncio.run(cls._stream_parquet(batch))
@classmethod
def _open_parquet(cls):
return ParquetFile(cls.file_source).iter_batches(
batch_size=cls.batch_size,
columns=cls.columns
)
@classmethod
async def _stream_parquet(cls, batch):
reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
writer.write(batch)
await writer.drain()
await reader.read()
writer.close()
await writer.wait_closed()
class ParquetFileReceiver:
"""
Attributes: \n
port: specify the port \n
n_bytes: -1 reads all the batch
Example:
>>> pfr = ParquetFileReceiver
>>> asyncio.run(pfr.server())
"""
port = 80
n_bytes = -1
@classmethod
async def handle_stream(cls, reader, writer):
data = await reader.read(cls.n_bytes)
batch = data.decode()
print(batch)
@classmethod
async def server(cls):
server = await asyncio.start_server(cls.handle_stream, port=cls.port)
async with server:
await server.serve_forever()
太棒了 post,根据@Micah 的回答,我将我的 2 美分放入其中,以防您不想阅读文档。一个小片段如下:
import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile
# create a random df then save to parquet
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
df.to_parquet('./test/test')
# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
batch_size=10,
columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict
我查看了我希望满足我的需求的标准文档 (Apache Arrow and Pandas),但我似乎无法理解。
我最了解Python,所以我想使用Python,但这不是严格要求。
问题
我需要将 Parquet 文件从一个位置 (URL) 移动到另一个位置(一个 Azure 存储帐户,在本例中使用 Azure 机器学习平台,但这与我的问题无关)。
这些文件太大,无法简单地执行 pd.read_parquet("https://my-file-location.parquet")
,因为这会将整个文件读入一个对象。
预期
我认为必须有一种简单 的方法来创建文件对象并逐行流式传输该对象——或者逐列逐块地传输。像
import pyarrow.parquet as pq
with pq.open("https://my-file-location.parquet") as read_file_handle:
with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
for next_line in read_file_handle{
write_file_handle.append(next_line)
我知道它会有点不同,因为 Parquet 主要是为了以柱状方式访问。也许我会传递某种配置对象,它指定感兴趣的列,或者可能在一个块或类似的东西中可以抓取多少行。
但关键的期望是 有一种方法可以访问 parquet 文件而无需将其全部加载到内存中。我该怎么做?
FWIW,我确实尝试只使用 Python 的标准 open
函数,但我不确定如何将 open
与 URL 位置和一个字节流。如果可以仅通过 open
并跳过任何特定于 Parquet 的内容来执行此操作,那也很好。
更新
一些评论建议使用类似 bash 的脚本,例如 here。如果没有别的,我可以使用这个,但它并不理想,因为:
- 我宁愿将这一切都保存在一个完整的语言 SDK 中,无论是 Python、Go 还是其他语言。如果解决方案移动到带有管道的 bash 脚本中,则需要外部调用,因为最终解决方案不会完全用 bash、Powershell 或任何脚本语言编写。
- 我真的很想利用 Parquet 本身的一些好处。正如我在下面的评论中提到的,Parquet 是列式存储。因此,如果我有一个 11 亿行和 100 列的“数据框”,但我只关心 3 列,我希望能够只下载这 3 列,节省还有很多时间和一些钱。
这是可能的,但需要一些工作,因为除了柱状 Parquet 还需要一个模式。
粗略的工作流程是:
打开一个parquet file阅读。
然后使用iter_batches增量读回行块(您也可以传递要从文件中读取的特定列以保存IO/CPU)。
然后您可以进一步转换
iter_batches
中的每个pa.RecordBatch
。完成第一批转换后,您可以获得它的 schema and create a new ParquetWriter.对于每个转换后的批处理调用 write_table. You have to first convert it to a
pa.Table
。关闭文件。
Parquet 需要随机访问,因此它不能从 URI 轻松流式传输(如果您通过 HTTP FSSpec 打开文件,pyarrow 应该支持它)但我认为您可能会在写入时被阻止。
请注意,我没有指定如何在远程服务器端使用批处理的实现。
我的解决方案是:使用 pyarrow.NativeFile 将批次写入缓冲区,然后使用 pyarrow.ipc.RecordBatchFileReader
我创建了这个 2 类 来帮助您进行流媒体处理
import asyncio
from pyarrow.parquet import ParquetFile
class ParquetFileStreamer:
"""
Attributes:
ip_address: ip address of the distant server
port: listening port of the distant server
n_bytes: -1 means read whole batch
file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
batch_size: default = 65536
columns: list of the columns you wish to select (if None selects all)
Example:
>>> pfs = ParquetFileStreamer
>>> class MyStreamer(ParquetFileStreamer)
file_source = '/usr/fromage/camembert.parquet
columns = ['name', 'price']
>>> MyStreamer.start_stream()
"""
ip_address = '192.168.1.1'
port = 80
n_bytes = -1
file_source: str
batch_size = 65536
columns = []
@classmethod
def start_stream(cls):
for batch in cls._open_parquet():
asyncio.run(cls._stream_parquet(batch))
@classmethod
def _open_parquet(cls):
return ParquetFile(cls.file_source).iter_batches(
batch_size=cls.batch_size,
columns=cls.columns
)
@classmethod
async def _stream_parquet(cls, batch):
reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
writer.write(batch)
await writer.drain()
await reader.read()
writer.close()
await writer.wait_closed()
class ParquetFileReceiver:
"""
Attributes: \n
port: specify the port \n
n_bytes: -1 reads all the batch
Example:
>>> pfr = ParquetFileReceiver
>>> asyncio.run(pfr.server())
"""
port = 80
n_bytes = -1
@classmethod
async def handle_stream(cls, reader, writer):
data = await reader.read(cls.n_bytes)
batch = data.decode()
print(batch)
@classmethod
async def server(cls):
server = await asyncio.start_server(cls.handle_stream, port=cls.port)
async with server:
await server.serve_forever()
太棒了 post,根据@Micah 的回答,我将我的 2 美分放入其中,以防您不想阅读文档。一个小片段如下:
import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile
# create a random df then save to parquet
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
df.to_parquet('./test/test')
# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
batch_size=10,
columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict