Spring 时间内的 Kafka 批处理 window
Spring Kafka batch within time window
Spring 监听kafka主题的启动环境(@KafkaListener / @StreamListener)
配置侦听器工厂以批处理模式运行:
ConcurrentKafkaListenerContainerFactory # setBatchListener
或通过application.properties
:
spring.kafka.listener.type=batch
如何配置框架以便给定两个数字:N 和 T,它会尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:
max-poll-records
确保您不会在一个批次中获得超过 N 个号码
fetch-min-size
在获取请求中至少获取此数量的数据
fetch-max-wait
但不要等待超过必要的时间
idleBetweenPolls
在民意调查之间睡一会儿 :)
似乎 fetch-min-size
与 fetch-max-wait
结合应该可以,但他们比较的是字节,而不是 messages/records。
显然可以手动实现,我正在寻找是否可以为我配置 Spring。
It seems like fetch-min-size
combined with fetch-max-wait
should do it but they compare bytes, not messages/records.
没错,遗憾的是,Kafka没有提供fetch.min.records
.
这样的机制
我预计 Spring 不会在 kafka 客户端之上添加此功能;最好在 Kafka 本身中请求一个新功能。
Spring 根本不处理从轮询返回的记录,除非您现在可以指定 subBatchPerPartition
以获取仅包含一个分区的批次,以便在使用 exactly once 时正确支持僵尸防护read/prcess/write.
Spring 监听kafka主题的启动环境(@KafkaListener / @StreamListener) 配置侦听器工厂以批处理模式运行:
ConcurrentKafkaListenerContainerFactory # setBatchListener
或通过application.properties
:
spring.kafka.listener.type=batch
如何配置框架以便给定两个数字:N 和 T,它会尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:
max-poll-records
确保您不会在一个批次中获得超过 N 个号码fetch-min-size
在获取请求中至少获取此数量的数据fetch-max-wait
但不要等待超过必要的时间idleBetweenPolls
在民意调查之间睡一会儿 :)
似乎 fetch-min-size
与 fetch-max-wait
结合应该可以,但他们比较的是字节,而不是 messages/records。
显然可以手动实现,我正在寻找是否可以为我配置 Spring。
It seems like
fetch-min-size
combined withfetch-max-wait
should do it but they compare bytes, not messages/records.
没错,遗憾的是,Kafka没有提供fetch.min.records
.
我预计 Spring 不会在 kafka 客户端之上添加此功能;最好在 Kafka 本身中请求一个新功能。
Spring 根本不处理从轮询返回的记录,除非您现在可以指定 subBatchPerPartition
以获取仅包含一个分区的批次,以便在使用 exactly once 时正确支持僵尸防护read/prcess/write.