限制 Apache Spark 3.0 结构化流中的批大小 - MicroBatchStream

Limit Batch Size in Apache Spark 3.0 Structured Streaming - MicroBatchStream

我正在编写自定义结构化流媒体源,但无法弄清楚如何限制批量大小。新的 MicroBatchStream 接口提供了 planInputPartitions 方法,该方法使用 latestOffset 的 return 值作为结束偏移量调用。然后 return 对数据进行分区,直到提供的最新偏移量在单个批次中处理。

当我开始一个新的流式查询时,这会导致第一批非常大,因为所有历史数据都被塞进了一个批中。

我已经尝试通过根据已经提交的内容逐渐增加 latestOffset 来手动限制批处理大小。但是,当从检查点重新启动查询时,这会失败,因为尚未提交任何内容。

是否有(明显的)方法来限制流处理批处理大小?

您可以为此使用 SupportsAdmissionControl 界面。这为您提供了 Offset latestOffset(Offset startOffset, ReadLimit limit); 方法,该方法允许您获取 startOffset 实际上是前一批的 endOffset。这样,您可以在计算 return latestOffset 响应之前应用大小限制。根据您的需要,您不一定需要使用 ReadLimit 参数 - 在我们的例子中,我们只是有一个我们使用的预定义阈值。对您来说重要的部分是 startOffset 参数。

然后,将使用正确的开始和结束偏移量调用 planInputPartitions,这是使用您的大小限制计算的。

您可以在 Kafka DataSource implmentation 中看到一个示例 - 请参阅 here