如何停止 Spring Kafka Listener 并在 Consumer 停止后调用一些函数来执行?

How to stop Spring Kafka Listner and invoke some function to execute after the Consumer is stopped?

我有一个场景,我需要停止 kafka 并在消费者停止后调用一些函数来执行。同样,流程将是这样的:

  1. 使用来自 kafka 主题的消息
  2. 将每个消费的消息添加到文件
  3. 如果在过去的 10 秒内没有收到任何消息,则停止 kafka 侦听器
  4. 调用一些函数 例如:UploadFileToS3()

我在我的消费者方法上使用 spring kafka 的 @KafkaListener 注释。

我知道停止用@KafkaListener 注释的 Kafka 消费者我可以使用 KafkaListenerEndpointRegistry

我想要这样的东西:

if(not consumed messages for more than 10s) {
    kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
    UploadFileToS3()
}

如何使用 Spring Kafka 执行此操作?

参见ListenerContainerIdleEvent

/**
 * An event that is emitted when a container is idle if the container
 * is configured to do so.
 *
 * @author Gary Russell
 *
 */
public class ListenerContainerIdleEvent extends KafkaEvent {

文档在这里:https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers