Spring 高消费者处理时间的 Kafka 配置

Spring Kafka configuration for high consumer processing time

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.2</version>
</dependency>

我有一个 kafka 消费者,处理一条记录需要 30-120 秒。

配置

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GGP> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, GGP> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        return factory;
    }

在为可能需要多分钟才能完成记录消费的消费者设置以下配置值时,需要考虑哪些因素。

heartbeat.interval.ms # default 3 seconds
session.timeout.ms # default 45 seconds
request.timeout.ms # default 30 seconds
max.poll.interval.ms # default 5 mins

据我目前的理解,max.poll.interval.ms 应该比单个记录的处理时间长,但长多少(有什么好的经验法则)?我知道这个值越大,从实际故障中恢复所需的时间就越长。但是,如果此值太低,则会过早失败。由于处理时间如此之长,是否需要更改任何其他配置?

max.poll.interval.ms 是消费者轮询之间的最长时间。它应该设置为比轮询期间获取的所有记录的处理时间长的值 (max.poll.records)。 请注意,它还会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡。

消费者失败是由消费者发送的心跳决定的。心跳的间隔使用 heartbeat.interval.ms

配置

当消费者session.timeout.ms没有发送心跳时,则认为失败。

非常适合您的用例,

  • session.timeout.ms 应设置为较低的值以更快地检测到故障。但大于heartbeat.interval.ms
  • max.poll.interval.ms 应设置为足以处理 max.poll.records.
  • 的较大值

注意,

session.timeout.ms 值必须在 group.min.session.timeout.msgroup.max.session.timeout.ms 的代理配置中配置的允许范围内。