在 Spring Cloud Stream Kafka Binder 中使用批处理时最多重试 3 次
Retry max 3 times when consuming batches in Spring Cloud Stream Kafka Binder
我在 kafka 中使用批处理,其中 spring 云流 kafka 活页夹不支持批处理模式的重试,有一个选项可以配置 SeekToCurrentBatchErrorHandler(使用 ListenerContainerCustomizer)来实现类似的在活页夹中重试的功能。
我试过同样的方法,但使用了 SeekToCurrentBatchErrorHandler,但它重试的次数超过了设置的时间,即 3 次。
我该怎么做?
我想重试整批。
如何将整批发送到 dlq 主题?就像记录监听器一样,我曾经将 deliveryAttempt(retry) 匹配到 3,然后发送到 DLQ 主题,检查监听器。
我已经检查了 this link, which is exactly my issue 但一个例子会很有帮助,有了这个库 spring-cloud-stream-kafka-binder,我能实现吗?请举例说明,我是新手。
目前我有以下代码。
@Configuration
public class ConsumerConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckOnError(false);
SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler
= new SeekToCurrentBatchErrorHandler();
seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L));
container.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
//container.setBatchErrorHandler(new BatchLoggingErrorHandler());
};
}
}
听众:
@StreamListener(ActivityChannel.INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment
acknowledgment,
@Header(name = "deliveryAttempt", defaultValue = "1") int
deliveryAttempt) {
try {
log.info("Received activity message with message length {}", messages.size());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
} catch (MessagePublishException e) {
if (deliveryAttempt == 3) {
log.error(
String.format("Exception occurred, sending the message=%s to DLQ due to: ",
"message"),
e);
publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage());
} else {
throw e;
}
}
}
看到@Gary 的回复后,添加了带有 RetryingBatchErrorHandler 的 ListenerContainerCustomizer @Bean,但无法导入 class。附上屏幕截图。
not able to import RetryingBatchErrorHandler
my spring cloud dependencies
使用 RetryingBatchErrorHandler
将整个批次发送到 DLT
https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
使用 RecoveringBatchErrorHandler
,您可以在其中抛出 BatchListenerFailedException
来告诉它批处理中的哪条记录失败。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
在这两种情况下都向错误处理程序提供 DeadLetterPublishingRecoverer
;在活页夹中禁用分布式账本技术。
编辑
这是一个例子;它使用较新的函数式样式而不是已弃用的 @StreamListener
,但适用相同的概念(但您应该考虑转向函数式样式)。
@SpringBootApplication
public class So69175145Application {
public static void main(String[] args) {
SpringApplication.run(So69175145Application.class, args);
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
KafkaTemplate<byte[], byte[]> template) {
return (container, dest, group) -> {
container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L),
new DeadLetterPublishingRecoverer(template,
(rec, ex) -> new TopicPartition("errors." + dest + "." + group, rec.partition()))));
};
}
/*
* DLT topic won't be auto-provisioned since enableDlq is false
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name("errors.so69175145.grp").partitions(1).replicas(1).build();
}
/*
* Functional equivalent of @StreamListener
*/
@Bean
public Consumer<List<String>> input() {
return list -> {
System.out.println(list);
throw new RuntimeException("test");
};
}
/*
* Not needed here - just to show we sent them to the DLT
*/
@KafkaListener(id = "so69175145", topics = "errors.so69175145.grp")
public void listen(String in) {
System.out.println("From DLT: " + in);
}
}
spring.cloud.stream.bindings.input-in-0.destination=so69175145
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
# for DLT listener
spring.kafka.consumer.auto-offset-reset=earliest
[foo]
2021-09-14 09:55:32.838ERROR...
...
[foo]
2021-09-14 09:55:37.873ERROR...
...
[foo]
2021-09-14 09:55:42.886ERROR...
...
From DLT: foo
我在 kafka 中使用批处理,其中 spring 云流 kafka 活页夹不支持批处理模式的重试,有一个选项可以配置 SeekToCurrentBatchErrorHandler(使用 ListenerContainerCustomizer)来实现类似的在活页夹中重试的功能。
我试过同样的方法,但使用了 SeekToCurrentBatchErrorHandler,但它重试的次数超过了设置的时间,即 3 次。
我该怎么做? 我想重试整批。
如何将整批发送到 dlq 主题?就像记录监听器一样,我曾经将 deliveryAttempt(retry) 匹配到 3,然后发送到 DLQ 主题,检查监听器。
我已经检查了 this link, which is exactly my issue 但一个例子会很有帮助,有了这个库 spring-cloud-stream-kafka-binder,我能实现吗?请举例说明,我是新手。
目前我有以下代码。
@Configuration
public class ConsumerConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckOnError(false);
SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler
= new SeekToCurrentBatchErrorHandler();
seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L));
container.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
//container.setBatchErrorHandler(new BatchLoggingErrorHandler());
};
}
}
听众:
@StreamListener(ActivityChannel.INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment
acknowledgment,
@Header(name = "deliveryAttempt", defaultValue = "1") int
deliveryAttempt) {
try {
log.info("Received activity message with message length {}", messages.size());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
} catch (MessagePublishException e) {
if (deliveryAttempt == 3) {
log.error(
String.format("Exception occurred, sending the message=%s to DLQ due to: ",
"message"),
e);
publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage());
} else {
throw e;
}
}
}
看到@Gary 的回复后,添加了带有 RetryingBatchErrorHandler 的 ListenerContainerCustomizer @Bean,但无法导入 class。附上屏幕截图。
not able to import RetryingBatchErrorHandler
my spring cloud dependencies
使用 RetryingBatchErrorHandler
将整个批次发送到 DLT
https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
使用 RecoveringBatchErrorHandler
,您可以在其中抛出 BatchListenerFailedException
来告诉它批处理中的哪条记录失败。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
在这两种情况下都向错误处理程序提供 DeadLetterPublishingRecoverer
;在活页夹中禁用分布式账本技术。
编辑
这是一个例子;它使用较新的函数式样式而不是已弃用的 @StreamListener
,但适用相同的概念(但您应该考虑转向函数式样式)。
@SpringBootApplication
public class So69175145Application {
public static void main(String[] args) {
SpringApplication.run(So69175145Application.class, args);
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
KafkaTemplate<byte[], byte[]> template) {
return (container, dest, group) -> {
container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L),
new DeadLetterPublishingRecoverer(template,
(rec, ex) -> new TopicPartition("errors." + dest + "." + group, rec.partition()))));
};
}
/*
* DLT topic won't be auto-provisioned since enableDlq is false
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name("errors.so69175145.grp").partitions(1).replicas(1).build();
}
/*
* Functional equivalent of @StreamListener
*/
@Bean
public Consumer<List<String>> input() {
return list -> {
System.out.println(list);
throw new RuntimeException("test");
};
}
/*
* Not needed here - just to show we sent them to the DLT
*/
@KafkaListener(id = "so69175145", topics = "errors.so69175145.grp")
public void listen(String in) {
System.out.println("From DLT: " + in);
}
}
spring.cloud.stream.bindings.input-in-0.destination=so69175145
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
# for DLT listener
spring.kafka.consumer.auto-offset-reset=earliest
[foo]
2021-09-14 09:55:32.838ERROR...
...
[foo]
2021-09-14 09:55:37.873ERROR...
...
[foo]
2021-09-14 09:55:42.886ERROR...
...
From DLT: foo