带有镶木地板和 pyarrow 的 http 请求
http request with parquet and pyarrow
我想使用 pyarrow read/query 来自休息服务器的镶木地板数据。目前我正在对数据进行分块,转换为 pandas,转储为 json,然后流式传输这些数据块。喜欢:
p = pq.ParquetDataset('/path/to/data.parquet', filters=filter, use_legacy_dataset=False)
batches = p._dataset.to_batches(filter=p._filter_expression)
(json.dumps(b.to_pandas().values.tolist()) for b in batches)
这实际上与
相同
ds = pq.ParquetDataset('/path/to/data.parquet',
use_legacy_dataset=False,
filters=filters)
df = ds.read().to_pandas()
data = pd.DataFrame(orjson.loads(orjson.dumps(df.values.tolist())))
没有网络io。它比直接读取 pandas
慢 50 倍
df = ds.read().to_pandas()
有没有办法将 parquet 数据集序列化为我可以通过 http 发送并在客户端解析的二进制字符串?
您可以使用内存柱状格式中的箭头发送数据。它将比 json 更加高效和紧凑。但请记住,它将是二进制数据(与 json 不同,它不是人类可读的)
有关完整示例,请参阅 doc。
在你的情况下你想做这样的事情:
ds = pq.ParquetDataset('/path/to/data.parquet',
use_legacy_dataset=False,
filters=filters)
table = ds.read() # pa.Table
# Write the data:
batches = table.to_batches()
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, table.schema)
for batch in batches:
writer.write(batch)
writer.close()
buf = sink.getvalue()
# Read the data:
reader = pa.ipc.open_stream(buf)
read_batches = [b for b in reader]
read_table = pa.Table.from_batches(read_batches)
read_table.to_pandas()
我想使用 pyarrow read/query 来自休息服务器的镶木地板数据。目前我正在对数据进行分块,转换为 pandas,转储为 json,然后流式传输这些数据块。喜欢:
p = pq.ParquetDataset('/path/to/data.parquet', filters=filter, use_legacy_dataset=False)
batches = p._dataset.to_batches(filter=p._filter_expression)
(json.dumps(b.to_pandas().values.tolist()) for b in batches)
这实际上与
相同ds = pq.ParquetDataset('/path/to/data.parquet',
use_legacy_dataset=False,
filters=filters)
df = ds.read().to_pandas()
data = pd.DataFrame(orjson.loads(orjson.dumps(df.values.tolist())))
没有网络io。它比直接读取 pandas
慢 50 倍df = ds.read().to_pandas()
有没有办法将 parquet 数据集序列化为我可以通过 http 发送并在客户端解析的二进制字符串?
您可以使用内存柱状格式中的箭头发送数据。它将比 json 更加高效和紧凑。但请记住,它将是二进制数据(与 json 不同,它不是人类可读的)
有关完整示例,请参阅 doc。
在你的情况下你想做这样的事情:
ds = pq.ParquetDataset('/path/to/data.parquet',
use_legacy_dataset=False,
filters=filters)
table = ds.read() # pa.Table
# Write the data:
batches = table.to_batches()
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, table.schema)
for batch in batches:
writer.write(batch)
writer.close()
buf = sink.getvalue()
# Read the data:
reader = pa.ipc.open_stream(buf)
read_batches = [b for b in reader]
read_table = pa.Table.from_batches(read_batches)
read_table.to_pandas()