Pyarrow.flight.do_get 当 pandas 数据帧超过 3GB 时出现段错误

Pyarrow.flight.do_get segfault when pandas Dataframe over 3GB

我的飞行服务器中有两个数据帧:v1 和 v2。 V1 非常小,而 v2 大约 3gb。我可以成功地向服务器请求 v1,但是在请求 v2 时发生段错误。

import numpy.random as rnd
import pandas as pd
import pyarrow as pa
import pyarrow.flight as fl
import numpy as np


class MyFlightServer(FlightServerBase):
    def __init__(self, location=None, options=None, **kwargs):
        super().__init__(location, **kwargs)
        self.tables = {}
        rng = rnd.default_rng()
        df = pd.DataFrame(np.random.standard_normal((1000, 5))).rename(
            columns={k: "col" + str(k) for k in range(5)}
        )
        self.tables[b"v1"] = pa.Table.from_pandas(df)

        df2 = pd.DataFrame(np.random.standard_normal((100000000, 5))).rename(
            columns={k: "col" + str(k) for k in range(5)}
        )
        print(df2.info())
        self.tables[b"v2"] = pa.Table.from_pandas(df2)

    def do_get(self, context, ticket):
        return RecordBatchStream(self.tables[ticket.ticket])


def main():
    with MyFlightServer() as server:
        # This works
        client = fl.connect(("localhost", server.port))
        data = client.do_get(fl.Ticket("v1")).read_pandas()

        # This will get a seg fault
        data = client.do_get(fl.Ticket("v2")).read_pandas()


main()

以上产生以下输出(可能与 df 详细信息相关):

RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
 #   Column  Dtype  
---  ------  -----  
 0   col0    float64
 1   col1    float64
 2   col2    float64
 3   col3    float64
 4   col4    float64
dtypes: float64(5)
memory usage: 3.7 GB
None
Segmentation fault (core dumped)

运行 所在的机器有 64GB 内存,运行 时使用了大约 15GB 内存。因此,我不考虑(也许是天真地)它找不到连续内存块的可能性。

我是在误用航班 server/record 批处理流媒体还是这可能是一个错误?

涉及的版本:

numpy==1.21.0
pandas==1.2.5
pyarrow==4.0.1
python-dateutil==2.8.1
pytz==2021.1
six==1.16.0

Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0]

这是 Arrow Flight 中的错误。有关详细信息,请参阅 ARROW-13253。本质上,这是因为 Flight 处理程序将数据作为单个 RecordBatch 发送,但现在,Flight 不支持发送大小大于 2GiB 的记录批次。但是,内部序列化处理程序 returns 在未初始化输出缓冲区的情况下发生错误,gRPC 试图在不检查错误的情况下盲目操作,从而导致崩溃。

您可以通过显式分块数据来解决此问题:

def do_get(self, context, ticket):
    table = self.tables[ticket.ticket]
    batches = table.to_batches(max_chunksize=65536)
    return fl.GeneratorStream(table.schema, batches)