如何停止 Spring Kafka Listener 并在 Consumer 停止后调用一些函数来执行?
How to stop Spring Kafka Listner and invoke some function to execute after the Consumer is stopped?
我有一个场景,我需要停止 kafka 并在消费者停止后调用一些函数来执行。同样,流程将是这样的:
- 使用来自 kafka 主题的消息
- 将每个消费的消息添加到文件
- 如果在过去的 10 秒内没有收到任何消息,则停止 kafka 侦听器
- 调用一些函数 例如:UploadFileToS3()
我在我的消费者方法上使用 spring kafka 的 @KafkaListener 注释。
我知道停止用@KafkaListener 注释的 Kafka 消费者我可以使用 KafkaListenerEndpointRegistry
我想要这样的东西:
if(not consumed messages for more than 10s) {
kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
UploadFileToS3()
}
如何使用 Spring Kafka 执行此操作?
参见ListenerContainerIdleEvent
:
/**
* An event that is emitted when a container is idle if the container
* is configured to do so.
*
* @author Gary Russell
*
*/
public class ListenerContainerIdleEvent extends KafkaEvent {
文档在这里:https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers
我有一个场景,我需要停止 kafka 并在消费者停止后调用一些函数来执行。同样,流程将是这样的:
- 使用来自 kafka 主题的消息
- 将每个消费的消息添加到文件
- 如果在过去的 10 秒内没有收到任何消息,则停止 kafka 侦听器
- 调用一些函数 例如:UploadFileToS3()
我在我的消费者方法上使用 spring kafka 的 @KafkaListener 注释。
我知道停止用@KafkaListener 注释的 Kafka 消费者我可以使用 KafkaListenerEndpointRegistry
我想要这样的东西:
if(not consumed messages for more than 10s) {
kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
UploadFileToS3()
}
如何使用 Spring Kafka 执行此操作?
参见ListenerContainerIdleEvent
:
/**
* An event that is emitted when a container is idle if the container
* is configured to do so.
*
* @author Gary Russell
*
*/
public class ListenerContainerIdleEvent extends KafkaEvent {
文档在这里:https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers