SeekToCurrentErrorHandler:DeadLetterPublishingRecoverer 未处理反序列化错误
SeekToCurrentErrorHandler: DeadLetterPublishingRecoverer is not handling deserialize errors
我正在尝试使用 spring-kafka 版本 2.3.0.M2 库编写 kafka 消费者。
为了处理 运行 时间错误,我使用 SeekToCurrentErrorHandler.class 和 DeadLetterPublishingRecoverer 作为我的恢复器。这仅在我的消费者代码抛出异常时工作正常,但在无法反序列化消息时失败。
我尝试自己实现 ErrorHandler,我成功了,但通过这种方法,我自己最终编写了 DLT 代码来处理我不想做的错误消息。
下面是我的kafka属性
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.mypackage
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}
它对我来说工作正常(注意 Boot 将 auto-configure 错误处理程序)...
@SpringBootApplication
public class So56728833Application {
public static void main(String[] args) {
SpringApplication.run(So56728833Application.class, args);
}
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
eh.setClassifier( // retry for all except deserialization exceptions
new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
return eh;
}
@KafkaListener(id = "so56728833"
+ "", topics = "so56728833")
public void listen(Foo in) {
System.out.println(in);
if (in.getBar().equals("baz")) {
throw new IllegalStateException("Test retries");
}
}
@KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
public void listenDLT(Object in) {
System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
}
@Bean
public NewTopic dlt() {
return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
spring:
kafka:
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.example
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: com.example.So56728833Application$Foo
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging:
level:
org.springframework.kafka: trace
我在主题中有3条记录:
"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"
我看到第一个直接进入 DLT,第二个在尝试 3 次后进入那里。
我正在尝试使用 spring-kafka 版本 2.3.0.M2 库编写 kafka 消费者。 为了处理 运行 时间错误,我使用 SeekToCurrentErrorHandler.class 和 DeadLetterPublishingRecoverer 作为我的恢复器。这仅在我的消费者代码抛出异常时工作正常,但在无法反序列化消息时失败。
我尝试自己实现 ErrorHandler,我成功了,但通过这种方法,我自己最终编写了 DLT 代码来处理我不想做的错误消息。
下面是我的kafka属性
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.mypackage
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}
它对我来说工作正常(注意 Boot 将 auto-configure 错误处理程序)...
@SpringBootApplication
public class So56728833Application {
public static void main(String[] args) {
SpringApplication.run(So56728833Application.class, args);
}
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
eh.setClassifier( // retry for all except deserialization exceptions
new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
return eh;
}
@KafkaListener(id = "so56728833"
+ "", topics = "so56728833")
public void listen(Foo in) {
System.out.println(in);
if (in.getBar().equals("baz")) {
throw new IllegalStateException("Test retries");
}
}
@KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
public void listenDLT(Object in) {
System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
}
@Bean
public NewTopic dlt() {
return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
spring:
kafka:
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.example
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: com.example.So56728833Application$Foo
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging:
level:
org.springframework.kafka: trace
我在主题中有3条记录:
"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"
我看到第一个直接进入 DLT,第二个在尝试 3 次后进入那里。