Spring Cloud Stream with Kafka Binder: /bindings Actuator API 不会停止生产者
Spring Cloud Stream with Kafka Binder: /bindings Actuator API does not stop producer
我有一个 Spring Cloud Stream 项目,带有 Actuator 和 Kafka 活页夹。我正在探索 bindings/
执行器并尝试停止生产者作为练习。我通过 curl 发出以下 POST 请求:
curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'
实际结果:
查询 returns 204。生产者的状态(从 GET /actuator/bindings/producer-out-0 中看到)现在是 stopped
。但是,生产者仍在生产消息,这可以从主题的日志记录和消费者 activity 中看出。
预期结果:
我希望生产者停止生产消息。 (我也试过使用 PAUSED 状态,也是 returns 204,但是错误日志表明这个生产者不能被暂停。)
我是不是误解了这个执行器的工作原理?当生产者停止时,是否预计 S.C.S。会继续投票那个制作人吗?我知道的唯一文档是 here,但据我所知它没有回答我的问题。
背景:
我正在使用 spring-boot-starter-parent 2.5.3 并将 starter-web 和 starter-actuator 列为依赖项。我不认为我遗漏了任何东西。
这是 producer/consumer 对。如您所见,我正在使用可轮询的供应商。
@Configuration
@Profile("numbers")
public class NumberHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);
@Bean
public Supplier<Integer> producer() {
// Needed an effectively-final mutable integer. Side-bar comments welcome. :P
var counter = new AtomicInteger();
return () -> {
var n = counter.getAndIncrement();
LOGGER.info("Producing number: " + n);
return n;
};
}
@Bean
public Consumer<Integer> consumer() {
return it -> LOGGER.info("Consuming number: " + it);
}
}
当我传入 numbers
配置文件时,这些是活动的。我的配置如下。
application.yml:
server:
port: 8081
spring:
cloud:
stream:
kafka:
binder:
brokers: ${env.kafka.bootstrapservers:localhost}
management:
endpoints:
web:
exposure:
include: 'bindings'
...和应用程序-numbers.yml:
spring:
cloud:
stream:
poller:
fixedDelay: 5000
bindings:
producer-out-0:
destination: numbers-raw
producer:
partitionCount: 3
consumer-in-0:
destination: numbers-raw
kafka:
bindings:
producer-out-0:
producer:
topic.properties:
# These look weird because they're done as an exercise.
retention.bytes: 10000
retention.ms: 172800000
function:
definition: producer;consumer
我正在本地主机环境中使用主机网络上的 docker-compose kafka 和 zookeeper 进行测试。
谢谢!
目前不支持生产者绑定的生命周期控制,仅支持消费者绑定。
我有一个 Spring Cloud Stream 项目,带有 Actuator 和 Kafka 活页夹。我正在探索 bindings/
执行器并尝试停止生产者作为练习。我通过 curl 发出以下 POST 请求:
curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'
实际结果:
查询 returns 204。生产者的状态(从 GET /actuator/bindings/producer-out-0 中看到)现在是 stopped
。但是,生产者仍在生产消息,这可以从主题的日志记录和消费者 activity 中看出。
预期结果: 我希望生产者停止生产消息。 (我也试过使用 PAUSED 状态,也是 returns 204,但是错误日志表明这个生产者不能被暂停。)
我是不是误解了这个执行器的工作原理?当生产者停止时,是否预计 S.C.S。会继续投票那个制作人吗?我知道的唯一文档是 here,但据我所知它没有回答我的问题。
背景:
我正在使用 spring-boot-starter-parent 2.5.3 并将 starter-web 和 starter-actuator 列为依赖项。我不认为我遗漏了任何东西。
这是 producer/consumer 对。如您所见,我正在使用可轮询的供应商。
@Configuration
@Profile("numbers")
public class NumberHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);
@Bean
public Supplier<Integer> producer() {
// Needed an effectively-final mutable integer. Side-bar comments welcome. :P
var counter = new AtomicInteger();
return () -> {
var n = counter.getAndIncrement();
LOGGER.info("Producing number: " + n);
return n;
};
}
@Bean
public Consumer<Integer> consumer() {
return it -> LOGGER.info("Consuming number: " + it);
}
}
当我传入 numbers
配置文件时,这些是活动的。我的配置如下。
application.yml:
server:
port: 8081
spring:
cloud:
stream:
kafka:
binder:
brokers: ${env.kafka.bootstrapservers:localhost}
management:
endpoints:
web:
exposure:
include: 'bindings'
...和应用程序-numbers.yml:
spring:
cloud:
stream:
poller:
fixedDelay: 5000
bindings:
producer-out-0:
destination: numbers-raw
producer:
partitionCount: 3
consumer-in-0:
destination: numbers-raw
kafka:
bindings:
producer-out-0:
producer:
topic.properties:
# These look weird because they're done as an exercise.
retention.bytes: 10000
retention.ms: 172800000
function:
definition: producer;consumer
我正在本地主机环境中使用主机网络上的 docker-compose kafka 和 zookeeper 进行测试。
谢谢!
目前不支持生产者绑定的生命周期控制,仅支持消费者绑定。