Quarkus Kafka反序列化死信队列异常

Quarkus Kafka Deserializing Exception to Dead Letter Queue

为了满足我服务的可靠性,我需要使用 kafka-smallryequarkus.[=15 将所有无法反序列化的传入消息推送到死信主题中=]

关于该主题的所有消息都应采用 avro 格式(但我无法确定),并在架构注册表中定义架构。

我是这样设置我的消费者的配置的:

mp:
  messaging:
    incoming:
      test-in:
        connector: smallrye-kafka
        group:
          id: test-in-consumer-group
        topic: events-topic
        failure-strategy: dead-letter-queue
        schema:
          registry:
            url: http://localhost:8081
        value:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        key:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        specific:
          avro:
            reader: true

我的消费代码:

@ApplicationScoped
public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
            String schemaFullName = data.getPayload().getSchema().getFullName();
            System.out.println(schemaFullName);

            // other consumer code
            return data.ack();
    }
}

当消费者无法反序列化消息时,消费进程将被阻止,而是将消息移至死信并继续。我假设反序列化错误不会产生 nack,因此无法将消息移至死信。

有办法将不可反序列化的消息移动到死信主题吗?

我使用 DeserializationFailureHandler 解决了。 您必须使用“dead-letter-queue”作为正常主题并发送您的失败推送。

@ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<CustomBean> { // Specify the expected type

    private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);

    @Inject
    @Channel("dead-letter")
    Emitter<DeadLetterBean> deadLetterBeanEmitter;

    @Override
    public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
                                           Exception exception, Headers headers) {
        LOGGER.error("ERROR: " + exception.getMessage());
        deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
                                          .withAck(() -> {
                                              // Called when the message is acked
                                              LOGGER.error("SENT TO DEAD LETTER");
                                              return CompletableFuture.completedFuture(null);
                                          })

                                          .withNack(throwable -> {
                                              // Called when the message is nacked
                                              LOGGER.error("ERROR, NOT SENT DEAD LETTER");
                                              return CompletableFuture.completedFuture(null);
                                          }));
        return null;
    }
}

也将新主题注册为发布者,deserialization-failure-handler

    mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
    mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
    mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter