使用 spring boot 为每个主题的 kafka 消费者设置轮询率
Set polling rate for kafka consumers per topic using springboot
是否可以在SpringBoot上为每个kafka主题的消费者设置不同的轮询率?我想让一个主题以更长的间隔(比如 5 分钟)进行轮询。我希望用它来实现对来自 kafka 的消息处理失败的重试。
一个示例实现会有很大帮助。
从 2.3 版本开始,侦听器容器有了新的 属性 idleBetweenPolls
。容器将确保轮询之间的时间不大于 max.poll.interval.ms - 5000
.
您可以使用容器定制器 bean 为特定消费者设置它...
@Component
class Customizer {
public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory, DefaultKafkaProducerFactory<?, ?> pf) {
factory.setContainerCustomizer(container -> {
if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
// or you can use the topic(s)
container.getContainerProperties().setIdleBetweenPolls(60_000);
}
});
}
}
是否可以在SpringBoot上为每个kafka主题的消费者设置不同的轮询率?我想让一个主题以更长的间隔(比如 5 分钟)进行轮询。我希望用它来实现对来自 kafka 的消息处理失败的重试。
一个示例实现会有很大帮助。
从 2.3 版本开始,侦听器容器有了新的 属性 idleBetweenPolls
。容器将确保轮询之间的时间不大于 max.poll.interval.ms - 5000
.
您可以使用容器定制器 bean 为特定消费者设置它...
@Component
class Customizer {
public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory, DefaultKafkaProducerFactory<?, ?> pf) {
factory.setContainerCustomizer(container -> {
if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
// or you can use the topic(s)
container.getContainerProperties().setIdleBetweenPolls(60_000);
}
});
}
}