以批处理模式使用 Spring Cloud Stream 发送到 DLQ 失败
Failed sending to DLQ using Spring Cloud Stream in batch mode
尝试配置 Spring 在使用批处理模式时将错误消息发送到死信队列。但是结果dlq topic里面什么都没有
我使用 Spring Boot 2.5.3 和 Spring Cloud 2020.0.3。这会自动将 spring-cloud-stream-binder-kafka-parent 的版本解析为 3.1.3.
这里是application.properties:
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.destination=topic4
spring.cloud.stream.bindings.input-in-0.group=batch4
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=5
这是函数式编程模型中的应用程序和批处理监听器:
@SpringBootApplication
public class DemoKafkaBatchErrorsApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaBatchErrorsApplication.class, args);
}
@Bean
public Consumer<List<byte[]>> input() {
return messages -> {
for (int i = 0; i < messages.size(); i++) {
throw new BatchListenerFailedException("Demo: failed to process = ", i);
}
};
}
@Bean
public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate<String, byte[]> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 10));
}
}
发送到主题:
./kafka-console-producer.sh --broker-list broker:9092 --topic topic4 < input.json
正在从 DLQ 读取:
./kafka-console-consumer.sh --bootstrap-server broker:9092 --topic topic4_ERR --from-beginning --max-messages 100
所以在 运行 这个应用程序之后,我在 dlq 主题中什么都没有,但在控制台中有很多消息,例如:
Caused by: org.springframework.kafka.listener.BatchListenerFailedException: Demo: failed to process = @-0
at com.example.demokafkabatcherrors.DemoKafkaBatchErrorsApplication.lambda$input[=15=](DemoKafkaBatchErrorsApplication.java:29) ~[classes/:na]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.2.jar:5.5.2]
... 27 common frames omitted
我做错了什么?
更新:
根据 Gary 的回答,我做了以下更改:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
return ((container, destinationName, group) -> container.setBatchErrorHandler(handler));
}
@Bean
public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition(cr.topic() + "_ERR", 0));
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 3));
}
一切都很顺利
使用spring-cloud-stream时,容器不是由Boot的容器工厂创建的,是由binder创建的;错误处理程序 @Bean
不会自动连接。
您必须改为配置 ListenerContainerCustomizer
@Bean
。
此处示例:Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?
尝试配置 Spring 在使用批处理模式时将错误消息发送到死信队列。但是结果dlq topic里面什么都没有
我使用 Spring Boot 2.5.3 和 Spring Cloud 2020.0.3。这会自动将 spring-cloud-stream-binder-kafka-parent 的版本解析为 3.1.3.
这里是application.properties:
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.destination=topic4
spring.cloud.stream.bindings.input-in-0.group=batch4
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=5
这是函数式编程模型中的应用程序和批处理监听器:
@SpringBootApplication
public class DemoKafkaBatchErrorsApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaBatchErrorsApplication.class, args);
}
@Bean
public Consumer<List<byte[]>> input() {
return messages -> {
for (int i = 0; i < messages.size(); i++) {
throw new BatchListenerFailedException("Demo: failed to process = ", i);
}
};
}
@Bean
public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate<String, byte[]> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 10));
}
}
发送到主题:
./kafka-console-producer.sh --broker-list broker:9092 --topic topic4 < input.json
正在从 DLQ 读取:
./kafka-console-consumer.sh --bootstrap-server broker:9092 --topic topic4_ERR --from-beginning --max-messages 100
所以在 运行 这个应用程序之后,我在 dlq 主题中什么都没有,但在控制台中有很多消息,例如:
Caused by: org.springframework.kafka.listener.BatchListenerFailedException: Demo: failed to process = @-0
at com.example.demokafkabatcherrors.DemoKafkaBatchErrorsApplication.lambda$input[=15=](DemoKafkaBatchErrorsApplication.java:29) ~[classes/:na]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.2.jar:5.5.2]
... 27 common frames omitted
我做错了什么?
更新: 根据 Gary 的回答,我做了以下更改:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
return ((container, destinationName, group) -> container.setBatchErrorHandler(handler));
}
@Bean
public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition(cr.topic() + "_ERR", 0));
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 3));
}
一切都很顺利
使用spring-cloud-stream时,容器不是由Boot的容器工厂创建的,是由binder创建的;错误处理程序 @Bean
不会自动连接。
您必须改为配置 ListenerContainerCustomizer
@Bean
。
此处示例:Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?