使用属性文件在 Spring Kafka 中处理错误?
Error handling in Spring Kafka with properties file?
在我的 Kafka Listener
被命中之前,我遇到了一堆反序列化失败。我正在研究 Gary Russel 构建的东西,但在让它工作时遇到了问题。我所有的东西都是通过属性文件配置的。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
所以如果我添加这些,我的理解是它在消费者记录的 headers 中包装了一个错误?我的最终目标是让任何反序列化异常命中我拥有的一些自定义 class,这样我就可以处理我想用它做的事情。 IE,转发到我的死信处理程序,它将失败的数据上传到 s3。
我尝试将错误处理程序标志添加到 kafkalistener,但这也没有做任何事情。
已更新属性 配置
我已经更新了我的配置,我仍然不清楚这是否正确。它不工作,所以我假设不。
正在调用自定义代码的 None
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
BadFoo
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
FailedFooProvider
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
见the documentation here and here。
另请查看 DeadLetterPublishingRecoverer
代码,它可用于将失败的记录发布到其他主题。您可以在此之后为您的代码建模以获得包含失败的 byte[]
.
的 header(s)
恢复器与SeekToCurrentErrorHandler
结合使用。
将错误处理程序配置为 @Bean
并且 Spring 引导会自动将其连接到容器中。
在我的 Kafka Listener
被命中之前,我遇到了一堆反序列化失败。我正在研究 Gary Russel 构建的东西,但在让它工作时遇到了问题。我所有的东西都是通过属性文件配置的。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
所以如果我添加这些,我的理解是它在消费者记录的 headers 中包装了一个错误?我的最终目标是让任何反序列化异常命中我拥有的一些自定义 class,这样我就可以处理我想用它做的事情。 IE,转发到我的死信处理程序,它将失败的数据上传到 s3。
我尝试将错误处理程序标志添加到 kafkalistener,但这也没有做任何事情。
已更新属性 配置
我已经更新了我的配置,我仍然不清楚这是否正确。它不工作,所以我假设不。
正在调用自定义代码的None
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
BadFoo
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
FailedFooProvider
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
见the documentation here and here。
另请查看 DeadLetterPublishingRecoverer
代码,它可用于将失败的记录发布到其他主题。您可以在此之后为您的代码建模以获得包含失败的 byte[]
.
恢复器与SeekToCurrentErrorHandler
结合使用。
将错误处理程序配置为 @Bean
并且 Spring 引导会自动将其连接到容器中。