Kafka 消费者在中断后不会自动重新连接

Kafka consumer not automatically reconnecting after outage

在我们的基础架构中,我们是具有 3 个节点的 运行 Kafka,并且在 OpenShift 中有多个 spring 引导服务 运行。服务之间的一些通信是通过 Kafka 进行的。对于 consumers/listeners,我们使用具有唯一组 ID 的 @KafkaListener spring 注释,以便每个实例(pod)使用主题的所有分区

@KafkaListener(topics = "myTopic", groupId = "group#{T(java.util.UUID).randomUUID().toString()}")
public void handleMessage(String message) {
    doStuffWithMessage(message);
}

对于配置,我们几乎使用默认值。对于消费者,我们得到的只是

spring.kafka.consumer:
  auto-offset-reset: latest
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

有时我们会遇到不幸的情况,我们所有的 Kafka 节点都很快关闭,这将导致消费者取消注册,如 org.apache.kafka.common.utils.AppInfoParser

App info kafka.consumer for consumer-group5c327050-5b05-46fb-a7be-c8d8a20d293a-1 unregistered

一旦节点再次启动,我们预计消费者会再次注册,但事实并非如此。到目前为止,我们不知道为什么他们没有这样做。目前,当出现此问题时,我们被迫重新启动受影响的 pods。以前有没有人遇到过类似的问题或知道我们可能做错了什么?

编辑:我们正在使用以下版本

在 kafka 配置中,您可以使用 reconnect.backoff.max.ms 配置参数来设置重试连接的最大毫秒数。 此外,将参数 reconnect.backoff.ms 设置为重试连接前等待的基本毫秒数。

If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum.

Kafka 文档https://kafka.apache.org/31/documentation/#streamsconfigs

如果您将重新连接的最大毫秒数设置为相当长的时间,例如一天,则将重新尝试连接最多一天(随着时间间隔的增加,50,500,5000,50000 等')。

我们在日志中做了更多挖掘,发现了导致消费者停止的潜在问题。

Authentcation/Authorization Exception and no authExceptionRetryInterval set

显然,消费者在尝试重新连接到当前不可用的 Kafka 节点时收到 Authentcation/Authorization 异常 ,因为我们没有设置 authExceptionRetryInterval 不会有任何重试并且消费者(侦听器容器)已停止。 https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerProperties.html#setAuthExceptionRetryInterval(java.time.Duration)

Set the interval between retries after and AuthenticationException or org.apache.kafka.common.errors.AuthorizationException is thrown by KafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less than max.poll.interval.ms consumer property.

我们很有信心,设置 authExceptionRetryInterval 将解决我们的问题。