使用属性文件在 Spring Kafka 中处理错误?

Error handling in Spring Kafka with properties file?

在我的 Kafka Listener 被命中之前,我遇到了一堆反序列化失败。我正在研究 Gary Russel 构建的东西,但在让它工作时遇到了问题。我所有的东西都是通过属性文件配置的。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

所以如果我添加这些,我的理解是它在消费者记录的 headers 中包装了一个错误?我的最终目标是让任何反序列化异常命中我拥有的一些自定义 class,这样我就可以处理我想用它做的事情。 IE,转发到我的死信处理程序,它将失败的数据上传到 s3。

我尝试将错误处理程序标志添加到 kafkalistener,但这也没有做任何事情。

已更新属性 配置

我已经更新了我的配置,我仍然不清楚这是否正确。它不工作,所以我假设不。

正在调用自定义代码的

None

spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider

spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate

BadFoo

public class BadFoo {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

    public FailedDeserializationInfo getFailedDeserializationInfo() {
        return this.failedDeserializationInfo;
    }
}

FailedFooProvider

public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
    @Override
    public String apply(FailedDeserializationInfo info) {
        System.out.println("");
        return "";
    }
}

the documentation here and here

另请查看 DeadLetterPublishingRecoverer 代码,它可用于将失败的记录发布到其他主题。您可以在此之后为您的代码建模以获得包含失败的 byte[].

的 header(s)

https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L169-L178

恢复器与SeekToCurrentErrorHandler结合使用。

将错误处理程序配置为 @Bean 并且 Spring 引导会自动将其连接到容器中。