Quarkus Kafka反序列化死信队列异常
Quarkus Kafka Deserializing Exception to Dead Letter Queue
为了满足我服务的可靠性,我需要使用 kafka-smallrye
和 quarkus
.[=15 将所有无法反序列化的传入消息推送到死信主题中=]
关于该主题的所有消息都应采用 avro 格式(但我无法确定),并在架构注册表中定义架构。
我是这样设置我的消费者的配置的:
mp:
messaging:
incoming:
test-in:
connector: smallrye-kafka
group:
id: test-in-consumer-group
topic: events-topic
failure-strategy: dead-letter-queue
schema:
registry:
url: http://localhost:8081
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
specific:
avro:
reader: true
我的消费代码:
@ApplicationScoped
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
String schemaFullName = data.getPayload().getSchema().getFullName();
System.out.println(schemaFullName);
// other consumer code
return data.ack();
}
}
当消费者无法反序列化消息时,消费进程将被阻止,而是将消息移至死信并继续。我假设反序列化错误不会产生 nack
,因此无法将消息移至死信。
有办法将不可反序列化的消息移动到死信主题吗?
我使用 DeserializationFailureHandler 解决了。
您必须使用“dead-letter-queue”作为正常主题并发送您的失败推送。
@ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
implements DeserializationFailureHandler<CustomBean> { // Specify the expected type
private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);
@Inject
@Channel("dead-letter")
Emitter<DeadLetterBean> deadLetterBeanEmitter;
@Override
public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
Exception exception, Headers headers) {
LOGGER.error("ERROR: " + exception.getMessage());
deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
.withAck(() -> {
// Called when the message is acked
LOGGER.error("SENT TO DEAD LETTER");
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOGGER.error("ERROR, NOT SENT DEAD LETTER");
return CompletableFuture.completedFuture(null);
}));
return null;
}
}
也将新主题注册为发布者,deserialization-failure-handler
mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter
为了满足我服务的可靠性,我需要使用 kafka-smallrye
和 quarkus
.[=15 将所有无法反序列化的传入消息推送到死信主题中=]
关于该主题的所有消息都应采用 avro 格式(但我无法确定),并在架构注册表中定义架构。
我是这样设置我的消费者的配置的:
mp:
messaging:
incoming:
test-in:
connector: smallrye-kafka
group:
id: test-in-consumer-group
topic: events-topic
failure-strategy: dead-letter-queue
schema:
registry:
url: http://localhost:8081
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
specific:
avro:
reader: true
我的消费代码:
@ApplicationScoped
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
String schemaFullName = data.getPayload().getSchema().getFullName();
System.out.println(schemaFullName);
// other consumer code
return data.ack();
}
}
当消费者无法反序列化消息时,消费进程将被阻止,而是将消息移至死信并继续。我假设反序列化错误不会产生 nack
,因此无法将消息移至死信。
有办法将不可反序列化的消息移动到死信主题吗?
我使用 DeserializationFailureHandler 解决了。 您必须使用“dead-letter-queue”作为正常主题并发送您的失败推送。
@ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
implements DeserializationFailureHandler<CustomBean> { // Specify the expected type
private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);
@Inject
@Channel("dead-letter")
Emitter<DeadLetterBean> deadLetterBeanEmitter;
@Override
public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
Exception exception, Headers headers) {
LOGGER.error("ERROR: " + exception.getMessage());
deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
.withAck(() -> {
// Called when the message is acked
LOGGER.error("SENT TO DEAD LETTER");
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOGGER.error("ERROR, NOT SENT DEAD LETTER");
return CompletableFuture.completedFuture(null);
}));
return null;
}
}
也将新主题注册为发布者,deserialization-failure-handler
mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter