无法在 Spring Kafka 中找到处理恢复的正确方法
Unable to find the proper way to handle recovery in Spring Kafka
我正在开发一个相当简单的 Spring 引导应用程序,它将处理来自单个主题的消息,然后为每条消息调用外部 Web 服务。我希望这个服务对错误有点聪明,所以如果外部网站在短时间内不可用,记录应该用指数退避重试,直到我们放弃并记录错误并提交的某个点记录。
我正在使用 Spring Boot 和 Spring Kafka 2.3.3。
我将向您展示一些针对此设置的 Spring 配置。为简洁起见,省略了一些内容。询问是否有一些可能有用的值或其他配置。
@Configuration
@EnableKafka
public class SpringConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
// .. Misc other properties related to serialisation etc ..
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public SeekToCurrentErrorHandler eh() {
long initialMillis = 500;
long factor = 2;
long maxElapsedTimeSecs = 60;
ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) -> {
// TODO In the final app do something more useful here
logger.error("* Maximum retry policy has been reached {} - acknowledging and proceeding *", rec);
};
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
eh.setCommitRecovered(true);
return eh;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMissingTopicsFatal(missingTopicsFatal); // True in prod, false otherwise
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
factory.setStatefulRetry(true);
factory.setErrorHandler(eh());
return factory;
}
}
我的监听器很简单:
@KafkaListener(topics = "${kafka.input_topic}")
public void handle(ConsumerRecord<String, SendToEBoksMessage> record, Acknowledgment acknowledgment) throws Exception {
logger.info("Listener invoked");
// TODO Right so simulate some sort of problem. External web service not available, for example.
throw new Exception("Exception of some kind");
}
但似乎 ExponentialBackoff
参数对 SeekToCurrentErrorHandler
引入的延迟增加导致 Kafka 发生重新平衡。重试几次后,日志显示正在重新平衡:
...
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void MyListenerClass.handle(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, MyMessageClass>,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Exception of some kind; nested exception is java.lang.Exception: Exception of some kind
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
... 10 more
Caused by: java.lang.Exception: Exception of some kind
at MyListenerClass.handle(SendToEboksMessageKafkaListener.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2019-12-16 12:49:04.364 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Revoking previously assigned partitions [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : MyTopic: partitions revoked: [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] (Re-)joining group
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Successfully joined group with generation 18
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Setting newly assigned partitions:
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : ...
我不明白为什么会这样。根据我的阅读,以这种方式使用错误处理程序进行恢复会导致重试由容器处理,并避免不经常调用 consumer.poll()
来满足 max.poll.ms
属性 的潜在问题.
谁能告诉我我做错了什么?
*** 更新:
我在 Kafka 代理日志中看到以下内容:
[2019-12-17 14:13:22,714] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 0 (__consumer_offsets-37) (reason: Adding new member consumer-1-2d76a488-3677-4294-9aed-c153f0dca66c with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,722] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 1 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,735] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:18,096] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 1 (__consumer_offsets-37) (reason: Adding new member consumer-1-addbdcfd-21ed-44fa-9d17-b10c7c67f07f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,161] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 2 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,163] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 2 (kafka.coordinator.group.GroupCoordinator)
这是我前段时间写的一个应用;我将它更新为 Boot 2.2.2,它工作正常:
@SpringBootApplication
public class Kgh1234Application {
public static void main(String[] args) {
SpringApplication.run(Kgh1234Application.class, args);
}
@KafkaListener(id = "kgh1234", topics = "kgh1234")
public void listen(String in) {
System.out.println(in);
if (in.endsWith("5")) {
throw new RuntimeException("fail");
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 2L)));
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("kgh1234", 32, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("kgh1234", "foo" + i));
};
}
}
和
spring.kafka.consumer.auto-offset-reset=earliest
它只使用默认的恢复器,当重试次数用尽时只记录日志。
foo5
2019-12-17 10:50:32.018 ERROR 32052 --- [ kgh1234-0-C-1] o.s.k.l.SeekToCurrentErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=3, maxAttempts=2} exhausted for ConsumerRecord(topic = kgh1234, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1576597830940, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo5)
嗯,解决方案与我的预期完全不同。
我的 Spring 引导应用程序工作正常,包括我用来构建 SeekToCurrentErrorHandler
.
的重试策略和恢复处理程序
问题是由于我的测试消息是如何提交到主题的。我会 运行 在控制台中使用
之类的应用程序
$ mvn spring-boot:run
一旦完成并且 运行 成为消费者,我将使用我的 eclipse IDE 进行单元测试 运行 以发布有关该主题的消息。
但是,由于单元测试重用与主应用相同的 Spring 配置,测试不仅会准备好生产者和 post 消息,还会初始化消费者与主应用程序使用的属性完全相同。当第二个消费者加入该组时,就会发生再平衡。测试完成后会发生另一个重新平衡。
一旦你想通了,这一切都是完全合乎逻辑的,并且完全解释了我在控制台应用程序的日志输出中看到的重新平衡。
非常感谢 Gary Russell 的帮助 - 非常感谢!
我正在开发一个相当简单的 Spring 引导应用程序,它将处理来自单个主题的消息,然后为每条消息调用外部 Web 服务。我希望这个服务对错误有点聪明,所以如果外部网站在短时间内不可用,记录应该用指数退避重试,直到我们放弃并记录错误并提交的某个点记录。
我正在使用 Spring Boot 和 Spring Kafka 2.3.3。
我将向您展示一些针对此设置的 Spring 配置。为简洁起见,省略了一些内容。询问是否有一些可能有用的值或其他配置。
@Configuration
@EnableKafka
public class SpringConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
// .. Misc other properties related to serialisation etc ..
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public SeekToCurrentErrorHandler eh() {
long initialMillis = 500;
long factor = 2;
long maxElapsedTimeSecs = 60;
ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) -> {
// TODO In the final app do something more useful here
logger.error("* Maximum retry policy has been reached {} - acknowledging and proceeding *", rec);
};
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
eh.setCommitRecovered(true);
return eh;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMissingTopicsFatal(missingTopicsFatal); // True in prod, false otherwise
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
factory.setStatefulRetry(true);
factory.setErrorHandler(eh());
return factory;
}
}
我的监听器很简单:
@KafkaListener(topics = "${kafka.input_topic}")
public void handle(ConsumerRecord<String, SendToEBoksMessage> record, Acknowledgment acknowledgment) throws Exception {
logger.info("Listener invoked");
// TODO Right so simulate some sort of problem. External web service not available, for example.
throw new Exception("Exception of some kind");
}
但似乎 ExponentialBackoff
参数对 SeekToCurrentErrorHandler
引入的延迟增加导致 Kafka 发生重新平衡。重试几次后,日志显示正在重新平衡:
...
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void MyListenerClass.handle(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, MyMessageClass>,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Exception of some kind; nested exception is java.lang.Exception: Exception of some kind
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
... 10 more
Caused by: java.lang.Exception: Exception of some kind
at MyListenerClass.handle(SendToEboksMessageKafkaListener.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2019-12-16 12:49:04.364 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Revoking previously assigned partitions [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : MyTopic: partitions revoked: [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] (Re-)joining group
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Successfully joined group with generation 18
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Setting newly assigned partitions:
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : ...
我不明白为什么会这样。根据我的阅读,以这种方式使用错误处理程序进行恢复会导致重试由容器处理,并避免不经常调用 consumer.poll()
来满足 max.poll.ms
属性 的潜在问题.
谁能告诉我我做错了什么?
*** 更新:
我在 Kafka 代理日志中看到以下内容:
[2019-12-17 14:13:22,714] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 0 (__consumer_offsets-37) (reason: Adding new member consumer-1-2d76a488-3677-4294-9aed-c153f0dca66c with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,722] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 1 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,735] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:18,096] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 1 (__consumer_offsets-37) (reason: Adding new member consumer-1-addbdcfd-21ed-44fa-9d17-b10c7c67f07f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,161] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 2 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,163] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 2 (kafka.coordinator.group.GroupCoordinator)
这是我前段时间写的一个应用;我将它更新为 Boot 2.2.2,它工作正常:
@SpringBootApplication
public class Kgh1234Application {
public static void main(String[] args) {
SpringApplication.run(Kgh1234Application.class, args);
}
@KafkaListener(id = "kgh1234", topics = "kgh1234")
public void listen(String in) {
System.out.println(in);
if (in.endsWith("5")) {
throw new RuntimeException("fail");
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 2L)));
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("kgh1234", 32, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("kgh1234", "foo" + i));
};
}
}
和
spring.kafka.consumer.auto-offset-reset=earliest
它只使用默认的恢复器,当重试次数用尽时只记录日志。
foo5
2019-12-17 10:50:32.018 ERROR 32052 --- [ kgh1234-0-C-1] o.s.k.l.SeekToCurrentErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=3, maxAttempts=2} exhausted for ConsumerRecord(topic = kgh1234, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1576597830940, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo5)
嗯,解决方案与我的预期完全不同。
我的 Spring 引导应用程序工作正常,包括我用来构建 SeekToCurrentErrorHandler
.
问题是由于我的测试消息是如何提交到主题的。我会 运行 在控制台中使用
之类的应用程序$ mvn spring-boot:run
一旦完成并且 运行 成为消费者,我将使用我的 eclipse IDE 进行单元测试 运行 以发布有关该主题的消息。
但是,由于单元测试重用与主应用相同的 Spring 配置,测试不仅会准备好生产者和 post 消息,还会初始化消费者与主应用程序使用的属性完全相同。当第二个消费者加入该组时,就会发生再平衡。测试完成后会发生另一个重新平衡。
一旦你想通了,这一切都是完全合乎逻辑的,并且完全解释了我在控制台应用程序的日志输出中看到的重新平衡。
非常感谢 Gary Russell 的帮助 - 非常感谢!