如何通过属性为 Spring Cloud Stream Kafka 设置偏移量提交间隔

How to set offset commit interval for Spring Cloud Stream Kafka via properties

我想通过属性为 Spring Cloud Stream 在 Kafka 消费者上设置自动偏移提交间隔。

正如我从指标中看到的那样,默认情况下 Spring Cloud Stream Kafka 会在每条消费的消息上提交偏移量。对于高负载主题(例如,如果流量是每秒 10K 条消息),它变得非常显着并增加了 Kafka 代理的负载。

我们通过以下方式声明消费者:

@Bean
public Consumer<TestEvent> testEvents() {
    …
}

我尝试了几个选项,但没有任何帮助。

选项 #1

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          consumer-properties:
            auto.offset.reset: latest
            auto.commit.interval.ms: 2000
            enable.auto.commit: true
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

使用这样的配置,消息消费正确,但健康检查的状态为down:

{
  "status": "DOWN",
  "components": {
    "binders": {
      "status": "DOWN",
      "components": {
        "kafka": {
          "status": "DOWN",
          "details": {
            "error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
          }
        }
      }
    }, …

据我了解,enable.auto.commit: true 不推荐用于 Spring Cloud Stream。

选项 #2

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
            ack-mode: TIME
            ack-time: 2000

而且,这样的配置 属性 ack-mode: TIME 仍然没有帮助,因为我们看到每个消费消息的偏移量提交。

我使用maven依赖spring-cloud-starter-stream-kafka版本3.0.12.RELEASE

首先,ack-mode是一个特定于kafka的消费者绑定属性,你在公共属性中有它。

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          configuration:
            auto.offset.reset: latest
        bindings:
          testEvents-in-0:
            consumer:
              ack-mode: TIME
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

其次,没有ack-time 属性,你必须通过容器定制器在容器上设置它:

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
    return (container, dest, group) -> {
        container.getContainerProperties().setAckTime(2000L);
        container.getContainerProperties().setLogContainerConfig(true);
    };
}

Gary​​ Russell 的回答绝对正确,我只想通过使用 spring.kafka.listener 属性和 KafkaProperties(来自包 org.springframework.boot.autoconfigure.kafka)添加另一个类似的替代方案:

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
  kafka:
    listener:
      ack-mode: TIME
      ack-time: 1000

和以下定制器:

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(KafkaProperties kafkaProperties) {
    return (container, destinationName, group) -> {
        var listener = kafkaProperties.getListener();
        var containerProperties = container.getContainerProperties();
        containerProperties.setAckMode(listener.getAckMode());
        containerProperties.setAckTime(listener.getAckTime().toMillis());
        containerProperties.setLogContainerConfig(true);
    };
}