如何使用纯 gRPC 客户端解码 Arrow Flight `FlightData`
How to decode Arrow Flight `FlightData` with a pure gRPC client
我遇到过需要使用普通 gRPC 客户端的情况(通过 grpc.aio
API) to talk to an Arrow Flight gRPC 服务器。
DoGet
调用确实到达了服务器,我们收到了 FlightData
响应。如果我们对 Flight gRPC
definition 的理解是正确的,则响应包含一条 flatbuffers
消息,可以以某种方式将其解码为 RecordBatch
.
以下是客户端代码,
import asyncio
import pathlib
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import flight_pb2, flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
stub = flight_pb2_grpc.FlightServiceStub(channel)
async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
assert type(data) is flight_pb2.FlightData
print(data)
# How to convert data into a RecordBatch?
asyncio.run(main())
目前我们停留在解码 FlightData
响应的最后一步。
问题有两层,
- 是否有一些现有的设施形式
pyarrow.flight
我们可以用来解码 FlightData
类型的 python grpc
对象;
- 如果 #1 不可能,还有哪些其他选项可以解码
FlightData
的内容并从头开始重建 RecordBatch
?
这里的主要兴趣是使用普通 gRPC 客户端的 AsyncIO。据推测,这对于当前版本的 Arrow Flight gRPC 客户端是不可行的。
pyarrow.flight 中确实没有针对此公开的实用程序。
ArrowData 包含 Arrow IPC header 和 body 等内容。所以你可以使用 pyarrow.ipc
来解码它。这是一个例子:
import asyncio
import pathlib
import struct
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import Flight_pb2, Flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
async with grpc.aio.insecure_channel("localhost:1234") as channel:
stub = Flight_pb2_grpc.FlightServiceStub(channel)
schema = None
async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
# 4 bytes: Need IPC continuation token
token = b'\xff\xff\xff\xff'
# 4 bytes: message length (little-endian)
length = struct.pack('<I', len(data.data_header))
buf = pa.py_buffer(token + length + data.data_header + data.data_body)
message = pa.ipc.read_message(buf)
print(message)
if schema is None:
# This should work but is unimplemented
# print(pa.ipc.read_schema(message))
schema = pa.ipc.read_schema(buf)
print(schema)
else:
batch = pa.ipc.read_record_batch(message, schema)
print(batch)
print(batch.to_pydict())
asyncio.run(main())
服务器:
import pyarrow.flight as flight
import pyarrow as pa
class TestServer(flight.FlightServerBase):
def do_get(self, context, ticket):
table = pa.table([[1,2,3,4]], names=["a"])
return flight.RecordBatchStream(table)
TestServer("grpc://localhost:1234").serve()
有一些关于异步飞行 API 的讨论,如果您想参与,请加入 dev@ mailing list。
我遇到过需要使用普通 gRPC 客户端的情况(通过 grpc.aio
API) to talk to an Arrow Flight gRPC 服务器。
DoGet
调用确实到达了服务器,我们收到了 FlightData
响应。如果我们对 Flight gRPC
definition 的理解是正确的,则响应包含一条 flatbuffers
消息,可以以某种方式将其解码为 RecordBatch
.
以下是客户端代码,
import asyncio
import pathlib
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import flight_pb2, flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
stub = flight_pb2_grpc.FlightServiceStub(channel)
async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
assert type(data) is flight_pb2.FlightData
print(data)
# How to convert data into a RecordBatch?
asyncio.run(main())
目前我们停留在解码 FlightData
响应的最后一步。
问题有两层,
- 是否有一些现有的设施形式
pyarrow.flight
我们可以用来解码FlightData
类型的 pythongrpc
对象; - 如果 #1 不可能,还有哪些其他选项可以解码
FlightData
的内容并从头开始重建RecordBatch
?
这里的主要兴趣是使用普通 gRPC 客户端的 AsyncIO。据推测,这对于当前版本的 Arrow Flight gRPC 客户端是不可行的。
pyarrow.flight 中确实没有针对此公开的实用程序。
ArrowData 包含 Arrow IPC header 和 body 等内容。所以你可以使用 pyarrow.ipc
来解码它。这是一个例子:
import asyncio
import pathlib
import struct
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import Flight_pb2, Flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
async with grpc.aio.insecure_channel("localhost:1234") as channel:
stub = Flight_pb2_grpc.FlightServiceStub(channel)
schema = None
async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
# 4 bytes: Need IPC continuation token
token = b'\xff\xff\xff\xff'
# 4 bytes: message length (little-endian)
length = struct.pack('<I', len(data.data_header))
buf = pa.py_buffer(token + length + data.data_header + data.data_body)
message = pa.ipc.read_message(buf)
print(message)
if schema is None:
# This should work but is unimplemented
# print(pa.ipc.read_schema(message))
schema = pa.ipc.read_schema(buf)
print(schema)
else:
batch = pa.ipc.read_record_batch(message, schema)
print(batch)
print(batch.to_pydict())
asyncio.run(main())
服务器:
import pyarrow.flight as flight
import pyarrow as pa
class TestServer(flight.FlightServerBase):
def do_get(self, context, ticket):
table = pa.table([[1,2,3,4]], names=["a"])
return flight.RecordBatchStream(table)
TestServer("grpc://localhost:1234").serve()
有一些关于异步飞行 API 的讨论,如果您想参与,请加入 dev@ mailing list。