有没有一种方法可以控制 Apache Arrow Batch Sizes?
Is there a method to control Apache Arrow Batch Sizes?
我想了解是否有一种机制可以控制从服务器发送到客户端的批量大小。
我已经从 Github repo 和一个基本的 F# 客户端实现了 python 服务器。
作为测试,我添加了一个包含 100 万行的广告投放,我想将其发送回客户端。
起初,客户端因以下 GRPC 异常而失败。
One or more errors occurred. (Status(StatusCode="ResourceExhausted", Detail="Received message exceeds the maximum configured message size."))
按照提示,已超出邮件大小。作为修复,我可以将允许的最大 grpc 消息大小设置为无限制,即
let ops = new GrpcChannelOptions()
ops.MaxReceiveMessageSize <- Nullable()
let downloadChannel = GrpcChannel.ForAddress(uri, ops)
let downloadClient = new FlightClient(download_channel)
但是,我想了解是否有一种方法可以设置从服务器发送到客户端的批量大小,即在服务器的 do_get 方法中
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
return pyarrow.flight.RecordBatchStream(self.flights[key])
我想在创建时设置批量大小 pyarrow.flight.RecordBatchStream。查看 documentation,使用 pyarrow.ipc.IpcWriteOptions 指定的选项不允许设置批量大小?
在此先感谢您的帮助:)
UPDATE - 请参阅下面已接受的答案,这使我走上了正确的道路。我已按如下方式更新我的代码以解决此问题。
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
reader = pyarrow.ipc.RecordBatchReader().from_batches(self.flights[key].schema, pyarrow.Table.to_batches(self.flights[key]))
return pyarrow.flight.RecordBatchStream(reader)
假设 self.flights[key]
是 pyarrow.Table
,您可以 re-chunk 提前使用 Table.to_batches
。 (这不会复制数据,它只会 re-slice 底层数组。)
请注意,大小以 行 为单位,根据数据类型的不同,可能与 字节 的大小不符。这是一个不幸的不匹配。您可以使用 get_total_buffer_size
来(便宜地)估计字节大小并根据需要进一步拆分批次(尽管如果您有类似单个 4MB 的字符串,那您就不走运了)。
我想了解是否有一种机制可以控制从服务器发送到客户端的批量大小。
我已经从 Github repo 和一个基本的 F# 客户端实现了 python 服务器。
作为测试,我添加了一个包含 100 万行的广告投放,我想将其发送回客户端。 起初,客户端因以下 GRPC 异常而失败。
One or more errors occurred. (Status(StatusCode="ResourceExhausted", Detail="Received message exceeds the maximum configured message size."))
按照提示,已超出邮件大小。作为修复,我可以将允许的最大 grpc 消息大小设置为无限制,即
let ops = new GrpcChannelOptions()
ops.MaxReceiveMessageSize <- Nullable()
let downloadChannel = GrpcChannel.ForAddress(uri, ops)
let downloadClient = new FlightClient(download_channel)
但是,我想了解是否有一种方法可以设置从服务器发送到客户端的批量大小,即在服务器的 do_get 方法中
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
return pyarrow.flight.RecordBatchStream(self.flights[key])
我想在创建时设置批量大小 pyarrow.flight.RecordBatchStream。查看 documentation,使用 pyarrow.ipc.IpcWriteOptions 指定的选项不允许设置批量大小?
在此先感谢您的帮助:)
UPDATE - 请参阅下面已接受的答案,这使我走上了正确的道路。我已按如下方式更新我的代码以解决此问题。
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
reader = pyarrow.ipc.RecordBatchReader().from_batches(self.flights[key].schema, pyarrow.Table.to_batches(self.flights[key]))
return pyarrow.flight.RecordBatchStream(reader)
假设 self.flights[key]
是 pyarrow.Table
,您可以 re-chunk 提前使用 Table.to_batches
。 (这不会复制数据,它只会 re-slice 底层数组。)
请注意,大小以 行 为单位,根据数据类型的不同,可能与 字节 的大小不符。这是一个不幸的不匹配。您可以使用 get_total_buffer_size
来(便宜地)估计字节大小并根据需要进一步拆分批次(尽管如果您有类似单个 4MB 的字符串,那您就不走运了)。