当与 Apache Kafka Server 的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?
How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?
我目前正在实现一个微服务,它从 Apache Kafka 主题中读取数据。我对微服务使用 "spring-boot, version: 1.5.6.RELEASE",对同一微服务中的侦听器使用 "spring-kafka, version: 1.2.2.RELEASE"。这是我的卡夫卡配置:
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
}};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我已经通过 @KafkaListener
注释实现了监听器:
@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
//business logic
latch.countDown();
}
我需要能够在侦听器与 Apache Kafka 服务器断开连接时关闭微服务。
当我终止 kafka 服务器时,我在 spring 引导日志中收到以下消息:
2017-11-01 19:58:15.721 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup
当我启动kafka服务器时,我得到:
2017-11-01 20:01:37.748 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.
很明显,我的微服务中的 Spring Kafka Listener 能够检测到 Kafka 服务器何时启动,何时 运行 以及何时停止。在书中按汇合Kafka The Definitive Guide in chapter But How Do We Exit?
it is said that the wakeup()
method needs to be called on the Consumer, so that a WakeupException
would be thrown. So I tried to capture the two events (Kafka server down and Kafka server up) with the @EventListener
tag, as described in the Spring for Apache Kafka documentation,再调用wakeup()
。但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人可以帮我解决这个问题吗?提前致谢。
我不知道如何获得服务器停机情况的通知(根据我的经验,消费者会在 poll()
内进入一个紧密循环)。
但是,如果您弄清楚了,您可以停止侦听器容器,这将唤醒消费者并退出紧密循环...
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.stop();
2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346
2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.
...
2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.
2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
您可以通过添加 reconnect.backoff.ms
来改进紧密循环,但是 poll()
永远不会退出,因此我们无法发出空闲事件。
spring:
kafka:
consumer:
enable-auto-commit: false
group-id: so47062346
properties:
reconnect.backoff.ms: 1000
我想你可以启用空闲事件并使用计时器来检测你是否在一段时间内没有收到数据(或空闲事件),然后停止容器。
我目前正在实现一个微服务,它从 Apache Kafka 主题中读取数据。我对微服务使用 "spring-boot, version: 1.5.6.RELEASE",对同一微服务中的侦听器使用 "spring-kafka, version: 1.2.2.RELEASE"。这是我的卡夫卡配置:
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
}};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我已经通过 @KafkaListener
注释实现了监听器:
@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
//business logic
latch.countDown();
}
我需要能够在侦听器与 Apache Kafka 服务器断开连接时关闭微服务。
当我终止 kafka 服务器时,我在 spring 引导日志中收到以下消息:
2017-11-01 19:58:15.721 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup
当我启动kafka服务器时,我得到:
2017-11-01 20:01:37.748 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.
很明显,我的微服务中的 Spring Kafka Listener 能够检测到 Kafka 服务器何时启动,何时 运行 以及何时停止。在书中按汇合Kafka The Definitive Guide in chapter But How Do We Exit?
it is said that the wakeup()
method needs to be called on the Consumer, so that a WakeupException
would be thrown. So I tried to capture the two events (Kafka server down and Kafka server up) with the @EventListener
tag, as described in the Spring for Apache Kafka documentation,再调用wakeup()
。但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人可以帮我解决这个问题吗?提前致谢。
我不知道如何获得服务器停机情况的通知(根据我的经验,消费者会在 poll()
内进入一个紧密循环)。
但是,如果您弄清楚了,您可以停止侦听器容器,这将唤醒消费者并退出紧密循环...
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.stop();
2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346
2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.
...
2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.
2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
您可以通过添加 reconnect.backoff.ms
来改进紧密循环,但是 poll()
永远不会退出,因此我们无法发出空闲事件。
spring:
kafka:
consumer:
enable-auto-commit: false
group-id: so47062346
properties:
reconnect.backoff.ms: 1000
我想你可以启用空闲事件并使用计时器来检测你是否在一段时间内没有收到数据(或空闲事件),然后停止容器。