处理消息后关闭kafka消费者

Shutdown kafka consumer after processing messages

我正在使用 @KafkaListener(topics = "${topic}") 在 spring-boot 应用程序中使用来自主题的消息,我需要定期 运行。 spring-kafka版本是2.2.4.RELEASE.

实现此目的的一种方法是使用 fetch.max.wait.ms 每 6 小时进行一次批处理,但 6 小时对于此配置来说似乎太多了。

因此,我正在寻找一种方法在处理后关闭应用程序,并每 6 小时重新启动它。

其他方法如下,但不能保证应用程序在休眠时间内完成处理(下例中为 30 秒)。

public class Application {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext run = SpringApplication.run(Application.class, args);
        Thread.sleep(30000);
        run.close();
    }
}

关闭消费者的优雅方式是什么,以确保关闭仅在处理完这批消息后发生?

当所有容器实例都空闲时关闭应用程序。