Spring Cloud Stream with Kafka Binder: /bindings Actuator API 不会停止生产者

Spring Cloud Stream with Kafka Binder: /bindings Actuator API does not stop producer

我有一个 Spring Cloud Stream 项目,带有 Actuator 和 Kafka 活页夹。我正在探索 bindings/ 执行器并尝试停止生产者作为练习。我通过 curl 发出以下 POST 请求:

curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'

实际结果: 查询 returns 204。生产者的状态(从 GET /actuator/bindings/producer-out-0 中看到)现在是 stopped。但是,生产者仍在生产消息,这可以从主题的日志记录和消费者 activity 中看出。

预期结果: 我希望生产者停止生产消息。 (我也试过使用 PAUSED 状态,也是 returns 204,但是错误日志表明这个生产者不能被暂停。)

我是不是误解了这个执行器的工作原理?当生产者停止时,是否预计 S.C.S。会继续投票那个制作人吗?我知道的唯一文档是 here,但据我所知它没有回答我的问题。

背景:

我正在使用 spring-boot-starter-parent 2.5.3 并将 starter-web 和 starter-actuator 列为依赖项。我不认为我遗漏了任何东西。

这是 producer/consumer 对。如您所见,我正在使用可轮询的供应商。

@Configuration
@Profile("numbers")
public class NumberHandlers {
  private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);

  @Bean
  public Supplier<Integer> producer() {
    // Needed an effectively-final mutable integer. Side-bar comments welcome. :P
    var counter = new AtomicInteger();
    return () -> {
      var n = counter.getAndIncrement();
      LOGGER.info("Producing number: " + n);
      return n;
    };
  }

  @Bean
  public Consumer<Integer> consumer() {
    return it -> LOGGER.info("Consuming number: " + it);
  }
}

当我传入 numbers 配置文件时,这些是活动的。我的配置如下。

application.yml:

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${env.kafka.bootstrapservers:localhost}
management:
  endpoints:
    web:
      exposure:
        include: 'bindings'

...和应用程序-numbers.yml:

spring:
  cloud:
    stream:
      poller:
        fixedDelay: 5000
      bindings:
        producer-out-0:
          destination: numbers-raw
          producer:
            partitionCount: 3
        consumer-in-0:
          destination: numbers-raw
      kafka:
        bindings:
          producer-out-0:
            producer:
              topic.properties:
                # These look weird because they're done as an exercise.
                retention.bytes: 10000
                retention.ms: 172800000
    function:
      definition: producer;consumer

我正在本地主机环境中使用主机网络上的 docker-compose kafka 和 zookeeper 进行测试。

谢谢!

目前不支持生产者绑定的生命周期控制,仅支持消费者绑定。