如何使用纯 gRPC 客户端解码 Arrow Flight `FlightData`

How to decode Arrow Flight `FlightData` with a pure gRPC client

我遇到过需要使用普通 gRPC 客户端的情况(通过 grpc.aio API) to talk to an Arrow Flight gRPC 服务器。

DoGet 调用确实到达了服务器,我们收到了 FlightData 响应。如果我们对 Flight gRPC definition 的理解是正确的,则响应包含一条 flatbuffers 消息,可以以某种方式将其解码为 RecordBatch.

以下是客户端代码,

import asyncio
import pathlib

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import flight_pb2, flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
    async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
        stub = flight_pb2_grpc.FlightServiceStub(channel)
        async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
            assert type(data) is flight_pb2.FlightData
            print(data)
            # How to convert data into a RecordBatch?

asyncio.run(main())

目前我们停留在解码 FlightData 响应的最后一步。

问题有两层,

  1. 是否有一些现有的设施形式 pyarrow.flight 我们可以用来解码 FlightData 类型的 python grpc 对象;
  2. 如果 #1 不可能,还有哪些其他选项可以解码 FlightData 的内容并从头开始重建 RecordBatch

这里的主要兴趣是使用普通 gRPC 客户端的 AsyncIO。据推测,这对于当前版本的 Arrow Flight gRPC 客户端是不可行的。

pyarrow.flight 中确实没有针对此公开的实用程序。

ArrowData 包含 Arrow IPC header 和 body 等内容。所以你可以使用 pyarrow.ipc 来解码它。这是一个例子:

import asyncio
import pathlib
import struct

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import Flight_pb2, Flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    async with grpc.aio.insecure_channel("localhost:1234") as channel:
        stub = Flight_pb2_grpc.FlightServiceStub(channel)
        schema = None
        async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
            # 4 bytes: Need IPC continuation token
            token = b'\xff\xff\xff\xff'
            # 4 bytes: message length (little-endian)
            length = struct.pack('<I', len(data.data_header))
            buf = pa.py_buffer(token + length + data.data_header + data.data_body)
            message = pa.ipc.read_message(buf)
            print(message)
            if schema is None:
                # This should work but is unimplemented
                # print(pa.ipc.read_schema(message))
                schema = pa.ipc.read_schema(buf)
                print(schema)
            else:
                batch = pa.ipc.read_record_batch(message, schema)
                print(batch)
                print(batch.to_pydict())

asyncio.run(main())

服务器:

import pyarrow.flight as flight
import pyarrow as pa

class TestServer(flight.FlightServerBase):
    def do_get(self, context, ticket):
        table = pa.table([[1,2,3,4]], names=["a"])
        return flight.RecordBatchStream(table)

TestServer("grpc://localhost:1234").serve()

有一些关于异步飞行 API 的讨论,如果您想参与,请加入 dev@ mailing list