如何在 Spring Cloud Stream Kafka Binder 中为死信队列应用保留时间配置?
How to apply retention time configuration for Dead Letter Queue in Spring Cloud Stream Kafka Binder?
我有一个使用 Spring Cloud Stream Kafka 的应用程序。
对于用户定义的主题,我可以通过提供下面提到的配置来删除指定主题中的记录。但此配置不适用于 DLQ 主题。
例如在下面的配置中,我在活页夹级别配置了保留时间。所以我在绑定级别下定义的生产者主题(学生主题)已正确配置,我可以检查当主题日志超过指定保留字节(300000000)时记录是否被删除。
但是活页夹级别保留时间不起作用 DLQ 主题(人员主题错误-dlq)。除了保留时间之外,是否有任何不同的 DLQ 主题清理记录配置。
我该怎么做?
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json
您只是在设置生产者绑定的(默认)属性。
也就是说,这对我仍然不起作用:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
(这些属性甚至不适用于主要主题)。
看起来默认的 kafka 消费者绑定属性有问题。
这对我有用;这些属性适用于主要主题和死信主题:
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
我有一个使用 Spring Cloud Stream Kafka 的应用程序。 对于用户定义的主题,我可以通过提供下面提到的配置来删除指定主题中的记录。但此配置不适用于 DLQ 主题。
例如在下面的配置中,我在活页夹级别配置了保留时间。所以我在绑定级别下定义的生产者主题(学生主题)已正确配置,我可以检查当主题日志超过指定保留字节(300000000)时记录是否被删除。
但是活页夹级别保留时间不起作用 DLQ 主题(人员主题错误-dlq)。除了保留时间之外,是否有任何不同的 DLQ 主题清理记录配置。
我该怎么做?
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json
您只是在设置生产者绑定的(默认)属性。
也就是说,这对我仍然不起作用:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
(这些属性甚至不适用于主要主题)。
看起来默认的 kafka 消费者绑定属性有问题。
这对我有用;这些属性适用于主要主题和死信主题:
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000