使用启用批处理模式的 Spring Cloud Stream 在 Kafka 中实施 DLQ

Implementing DLQ in Kafka using Spring Cloud Stream with Batch mode enabled

我正在尝试使用 spring 启用批处理模式的云流来实施 DLQ

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
        return ((container, destinationName, group) -> {
              if(dlqEnabledTopic.contains(destinationName))
                                    container.setBatchErrorHandler(handler);});
    }

    @Bean
    public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
        CustomDeadLetterPublishingRecoverer recoverer = new CustomDeadLetterPublishingRecoverer(kafkaOperations,
                (cr, e) -> new TopicPartition(cr.topic()+"_dlq", cr.partition()));
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(1000, 1));
    }

但有一些疑问:

  1. 如何使用属性配置 key/value 序列化程序 - 我的消息是字符串类型,但 KafkaOperations 使用的是 ByteArraySerializer

  2. 批处理中有多条消息,但如果第一条消息失败,它会转到 DLQ,但看不到下一条消息的处理。

要求 - 在任何索引处,如果批处理失败,我只需要将该消息发送到 DLQ,其余消息应再次处理。

  1. DLQ 现在支持批处理模式吗?就像记录模式一样,可以使用属性启用它
  1. spring.kafka.producer.* 属性 - 但是,DLT 发布应使用与主流应用程序相同的序列化程序。 ByteArraySerializer 大体上是正确的。

  2. 正在恢复的批处理错误处理程序将查找未处理的记录并将它们返回。调试日志记录应该可以帮助您找出问题所在。如果您无法理解,请提供一个 MCRE 来展示您所看到的行为。

  3. 否;活页夹不支持批处理模式的 DLQ;配置错误处理程序是正确的方法。