无法在 Spring Kafka 中找到处理恢复的正确方法

Unable to find the proper way to handle recovery in Spring Kafka

我正在开发一个相当简单的 Spring 引导应用程序,它将处理来自单个主题的消息,然后为每条消息调用外部 Web 服务。我希望这个服务对错误有点聪明,所以如果外部网站在短时间内不可用,记录应该用指数退避重试,直到我们放弃并记录错误并提交的某个点记录。

我正在使用 Spring Boot 和 Spring Kafka 2.3.3。

我将向您展示一些针对此设置的 Spring 配置。为简洁起见,省略了一些内容。询问是否有一些可能有用的值或其他配置。

@Configuration
@EnableKafka
public class SpringConfiguration {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        // .. Misc other properties related to serialisation etc ..
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public SeekToCurrentErrorHandler eh() {
        long initialMillis = 500;
        long factor = 2;
        long maxElapsedTimeSecs = 60;
        ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
        backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

        BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
           // TODO In the final app do something more useful here
           logger.error("* Maximum retry policy has been reached {} - acknowledging and proceeding *", rec);
        };

        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
        eh.setCommitRecovered(true);
        return eh;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMissingTopicsFatal(missingTopicsFatal); // True in prod, false otherwise

 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
        factory.setStatefulRetry(true);
        factory.setErrorHandler(eh());
        return factory;
    }
}

我的监听器很简单:

@KafkaListener(topics = "${kafka.input_topic}")
public void handle(ConsumerRecord<String, SendToEBoksMessage> record, Acknowledgment acknowledgment) throws Exception {
    logger.info("Listener invoked");
    // TODO Right so simulate some sort of problem. External web service not available, for example.
    throw new Exception("Exception of some kind");
}

但似乎 ExponentialBackoff 参数对 SeekToCurrentErrorHandler 引入的延迟增加导致 Kafka 发生重新平衡。重试几次后,日志显示正在重新平衡:

...
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void MyListenerClass.handle(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, MyMessageClass>,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Exception of some kind; nested exception is java.lang.Exception: Exception of some kind
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
    ... 10 more
Caused by: java.lang.Exception: Exception of some kind
    at MyListenerClass.handle(SendToEboksMessageKafkaListener.java:20)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

2019-12-16 12:49:04.364  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Revoking previously assigned partitions [MyTopic-0]
2019-12-16 12:49:04.365  INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : MyTopic: partitions revoked: [MyTopic-0]
2019-12-16 12:49:04.365  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] (Re-)joining group
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Successfully joined group with generation 18
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Setting newly assigned partitions: 
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : ...

我不明白为什么会这样。根据我的阅读,以这种方式使用错误处理程序进行恢复会导致重试由容器处理,并避免不经常调用 consumer.poll() 来满足 max.poll.ms 属性 的潜在问题.

谁能告诉我我做错了什么?

*** 更新:

我在 Kafka 代理日志中看到以下内容:

[2019-12-17 14:13:22,714] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 0 (__consumer_offsets-37) (reason: Adding new member consumer-1-2d76a488-3677-4294-9aed-c153f0dca66c with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,722] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 1 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,735] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:18,096] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 1 (__consumer_offsets-37) (reason: Adding new member consumer-1-addbdcfd-21ed-44fa-9d17-b10c7c67f07f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,161] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 2 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,163] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 2 (kafka.coordinator.group.GroupCoordinator)

这是我前段时间写的一个应用;我将它更新为 Boot 2.2.2,它工作正常:

@SpringBootApplication
public class Kgh1234Application {

    public static void main(String[] args) {
        SpringApplication.run(Kgh1234Application.class, args);
    }


    @KafkaListener(id = "kgh1234", topics = "kgh1234")
    public void listen(String in) {
        System.out.println(in);
        if (in.endsWith("5")) {
            throw new RuntimeException("fail");
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 2L)));
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("kgh1234", 32, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("kgh1234", "foo" + i));
        };
    }

}

spring.kafka.consumer.auto-offset-reset=earliest

它只使用默认的恢复器,当重试次数用尽时只记录日志。

foo5

2019-12-17 10:50:32.018 ERROR 32052 --- [ kgh1234-0-C-1] o.s.k.l.SeekToCurrentErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=3, maxAttempts=2} exhausted for ConsumerRecord(topic = kgh1234, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1576597830940, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo5)

嗯,解决方案与我的预期完全不同。

我的 Spring 引导应用程序工作正常,包括我用来构建 SeekToCurrentErrorHandler.

的重试策略和恢复处理程序

问题是由于我的测试消息是如何提交到主题的。我会 运行 在控制台中使用

之类的应用程序
$ mvn spring-boot:run

一旦完成并且 运行 成为消费者,我将使用我的 eclipse IDE 进行单元测试 运行 以发布有关该主题的消息。

但是,由于单元测试重用与主应用相同的 Spring 配置,测试不仅会准备好生产者和 post 消息,还会初始化消费者与主应用程序使用的属性完全相同。当第二个消费者加入该组时,就会发生再平衡。测试完成后会发生另一个重新平衡。

一旦你想通了,这一切都是完全合乎逻辑的,并且完全解释了我在控制台应用程序的日志输出中看到的重新平衡。

非常感谢 Gary Russell 的帮助 - 非常感谢!