使用启用批处理模式的 Spring Cloud Stream 在 Kafka 中实施 DLQ
Implementing DLQ in Kafka using Spring Cloud Stream with Batch mode enabled
我正在尝试使用 spring 启用批处理模式的云流来实施 DLQ
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
return ((container, destinationName, group) -> {
if(dlqEnabledTopic.contains(destinationName))
container.setBatchErrorHandler(handler);});
}
@Bean
public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
CustomDeadLetterPublishingRecoverer recoverer = new CustomDeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition(cr.topic()+"_dlq", cr.partition()));
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(1000, 1));
}
但有一些疑问:
如何使用属性配置 key/value 序列化程序 - 我的消息是字符串类型,但 KafkaOperations 使用的是 ByteArraySerializer
批处理中有多条消息,但如果第一条消息失败,它会转到 DLQ,但看不到下一条消息的处理。
要求 - 在任何索引处,如果批处理失败,我只需要将该消息发送到 DLQ,其余消息应再次处理。
- DLQ 现在支持批处理模式吗?就像记录模式一样,可以使用属性启用它
spring.kafka.producer.*
属性 - 但是,DLT 发布应使用与主流应用程序相同的序列化程序。 ByteArraySerializer
大体上是正确的。
正在恢复的批处理错误处理程序将查找未处理的记录并将它们返回。调试日志记录应该可以帮助您找出问题所在。如果您无法理解,请提供一个 MCRE 来展示您所看到的行为。
否;活页夹不支持批处理模式的 DLQ;配置错误处理程序是正确的方法。
我正在尝试使用 spring 启用批处理模式的云流来实施 DLQ
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
return ((container, destinationName, group) -> {
if(dlqEnabledTopic.contains(destinationName))
container.setBatchErrorHandler(handler);});
}
@Bean
public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
CustomDeadLetterPublishingRecoverer recoverer = new CustomDeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition(cr.topic()+"_dlq", cr.partition()));
return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(1000, 1));
}
但有一些疑问:
如何使用属性配置 key/value 序列化程序 - 我的消息是字符串类型,但 KafkaOperations 使用的是 ByteArraySerializer
批处理中有多条消息,但如果第一条消息失败,它会转到 DLQ,但看不到下一条消息的处理。
要求 - 在任何索引处,如果批处理失败,我只需要将该消息发送到 DLQ,其余消息应再次处理。
- DLQ 现在支持批处理模式吗?就像记录模式一样,可以使用属性启用它
spring.kafka.producer.*
属性 - 但是,DLT 发布应使用与主流应用程序相同的序列化程序。ByteArraySerializer
大体上是正确的。正在恢复的批处理错误处理程序将查找未处理的记录并将它们返回。调试日志记录应该可以帮助您找出问题所在。如果您无法理解,请提供一个 MCRE 来展示您所看到的行为。
否;活页夹不支持批处理模式的 DLQ;配置错误处理程序是正确的方法。