Spring 时间内的 Kafka 批处理 window

Spring Kafka batch within time window

Spring 监听kafka主题的启动环境(@KafkaListener / @StreamListener) 配置侦听器工厂以批处理模式运行:

ConcurrentKafkaListenerContainerFactory # setBatchListener

或通过application.properties:

spring.kafka.listener.type=batch

如何配置框架以便给定两个数字:N 和 T,它会尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:

似乎 fetch-min-sizefetch-max-wait 结合应该可以,但他们比较的是字节,而不是 messages/records。

显然可以手动实现,我正在寻找是否可以为我配置 Spring。

It seems like fetch-min-size combined with fetch-max-wait should do it but they compare bytes, not messages/records.

没错,遗憾的是,Kafka没有提供fetch.min.records.

这样的机制

我预计 Spring 不会在 kafka 客户端之上添加此功能;最好在 Kafka 本身中请求一个新功能。

Spring 根本不处理从轮询返回的记录,除非您现在可以指定 subBatchPerPartition 以获取仅包含一个分区的批次,以便在使用 exactly once 时正确支持僵尸防护read/prcess/write.