DeadLetterPublishingRecoverer 未在 .DLT 主题上发布 "original payload"
DeadLetterPublishingRecoverer not publishing the "original payload" on the .DLT topic
我正在使用 spring-kafka-2.2.9.RELEASE
和 kafka_2.12-2.3.0
。
我一直在尝试获取 .DLT 主题中的 original payload
,但我得到的只是 "null"。我确信这可以使用 ErrorHandlingDeserializer2
、SeekToCurrentErrorHandler
和 DeadLetterPublishingRecoverer
来完成,但我不确定我缺少什么。
生产者与消费者
Producer:
@Autowired
private ObjectMapper objectMapper;
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("acks","all");
return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<>());
//return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<Object>(objectMapper));
}
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<Object, Object>(producerFactory());
}
Consumer:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
jsonDeserializer.addTrustedPackages("*");
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerKey = new ErrorHandlingDeserializer2<>(jsonDeserializer);
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerValue = new ErrorHandlingDeserializer2<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, errorHandlingDeserializerKey, errorHandlingDeserializerValue);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));
return factory;
}
我在 topic.DLT 中看到 "null":
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic.DLT --from-beginning
null
null
在此先感谢您的帮助。
在ErrorHandlingDeserializer2
中有这样的逻辑:
private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
return this.failedDeserializationFunction.apply(failedDeserializationInfo);
}
else {
return null;
}
}
因此,如果未提供 failedDeserializationFunction
,反序列化数据将返回为 null
。
有关详细信息,请参阅文档:https://docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/#error-handling-deserializer
If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2
returns a null value and a DeserializationException
in a header that contains the cause and the raw bytes.
您目前无法使用 2.@KafkaListener
中的 DLT 记录(由于序列化异常而发布)。2.x 因为检测到 DeserializationException
header由容器将 DLT 记录本身发送到错误处理程序。
我刚刚 fixed this,它将在 2.2.11 中可用。
但是,如果您使用正则 KafkaConsumer
,您可以使用此代码获取原始值...
Header exHeader = record.headers().lastHeader(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
DeserializationException ex = (DeserializationException) new ObjectInputStream(
new ByteArrayInputStream(exHeader.value())).readObject();
System.out.println("DLT: " + new String(ex.getData()));
注意:如果您使用 Artem 建议的函数,记录将转到您的主侦听器,而不是错误处理程序,因此您必须在那里处理它。因此,您将需要某种包含原始有效负载的虚拟值。
如果能升级到2.3.1肯定更简单
我正在使用 spring-kafka-2.2.9.RELEASE
和 kafka_2.12-2.3.0
。
我一直在尝试获取 .DLT 主题中的 original payload
,但我得到的只是 "null"。我确信这可以使用 ErrorHandlingDeserializer2
、SeekToCurrentErrorHandler
和 DeadLetterPublishingRecoverer
来完成,但我不确定我缺少什么。
生产者与消费者
Producer:
@Autowired
private ObjectMapper objectMapper;
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("acks","all");
return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<>());
//return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<Object>(objectMapper));
}
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<Object, Object>(producerFactory());
}
Consumer:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
jsonDeserializer.addTrustedPackages("*");
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerKey = new ErrorHandlingDeserializer2<>(jsonDeserializer);
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerValue = new ErrorHandlingDeserializer2<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, errorHandlingDeserializerKey, errorHandlingDeserializerValue);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));
return factory;
}
我在 topic.DLT 中看到 "null":
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic.DLT --from-beginning
null
null
在此先感谢您的帮助。
在ErrorHandlingDeserializer2
中有这样的逻辑:
private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
return this.failedDeserializationFunction.apply(failedDeserializationInfo);
}
else {
return null;
}
}
因此,如果未提供 failedDeserializationFunction
,反序列化数据将返回为 null
。
有关详细信息,请参阅文档:https://docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/#error-handling-deserializer
If the delegate fails to deserialize the record content, the
ErrorHandlingDeserializer2
returns a null value and aDeserializationException
in a header that contains the cause and the raw bytes.
您目前无法使用 2.@KafkaListener
中的 DLT 记录(由于序列化异常而发布)。2.x 因为检测到 DeserializationException
header由容器将 DLT 记录本身发送到错误处理程序。
我刚刚 fixed this,它将在 2.2.11 中可用。
但是,如果您使用正则 KafkaConsumer
,您可以使用此代码获取原始值...
Header exHeader = record.headers().lastHeader(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
DeserializationException ex = (DeserializationException) new ObjectInputStream(
new ByteArrayInputStream(exHeader.value())).readObject();
System.out.println("DLT: " + new String(ex.getData()));
注意:如果您使用 Artem 建议的函数,记录将转到您的主侦听器,而不是错误处理程序,因此您必须在那里处理它。因此,您将需要某种包含原始有效负载的虚拟值。
如果能升级到2.3.1肯定更简单