Spring Boot Kafka 客户端有 "Circuit Breaker" 吗?

Is there a "Circuit Breaker" for Spring Boot Kafka client?

万一 Kafka 服务器(暂时)关闭,我的 Spring 引导应用程序 ReactiveKafkaConsumerTemplate 一直尝试连接失败,从而导致不必要的流量并弄乱日志文件:

2021-11-10 14:45:30.265  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected

是否可以使用类似断路器的东西(灵感here or here),所以Spring启动Kafka客户端如果发生故障(或者更好的是连续几次失败)减慢连接尝试的速度,并且returns只有在服务器重新启动后才能正常运行?

是否已经有现成的配置参数,或者其他解决方案?

我知道 parameter reconnect.backoff.ms,这就是我创建 ReactiveKafkaConsumerTemplate bean 的方式:

@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
    final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
    map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
    map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
    final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("com.example.myapplication");

    return new ReactiveKafkaConsumerTemplate<>(
            ReceiverOptions
                    .<String, MyEvent>create(map)
                    .withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
                    .withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
                    .subscription(List.of("MyTopic")));
}

消费者仍然每 3 秒尝试连接一次。

https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.