如何在 Spring Cloud Stream Kafka Binder 中为死信队列应用保留时间配置?

How to apply retention time configuration for Dead Letter Queue in Spring Cloud Stream Kafka Binder?

我有一个使用 Spring Cloud Stream Kafka 的应用程序。 对于用户定义的主题,我可以通过提供下面提到的配置来删除指定主题中的记录。但此配置不适用于 DLQ 主题。

例如在下面的配置中,我在活页夹级别配置了保留时间。所以我在绑定级别下定义的生产者主题(学生主题)已正确配置,我可以检查当主题日志超过指定保留字节(300000000)时记录是否被删除。

但是活页夹级别保留时间不起作用 DLQ 主题(人员主题错误-dlq)。除了保留时间之外,是否有任何不同的 DLQ 主题清理记录配置。

我该怎么做?

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                    binder:
                      brokers: localhost:19092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json
          group: person-topic-group
        student-topic-out:
          binder: defaultKafka
          destination: student-topic
          contentType: application/json

您只是在设置生产者绑定的(默认)属性。

也就是说,这对我仍然不起作用:

      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                      consumer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000

(这些属性甚至不适用于主要主题)。

看起来默认的 kafka 消费者绑定属性有问题。

这对我有用;这些属性适用于主要主题和死信主题:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
              topic:
                properties:
                  retention.bytes: 300000000
                  segment.bytes: 300000000