Spring Kafka 在异常时停止容器

Spring Kafka Stop container on Exception

我正在使用 ConcurrentMessageListenerContainer 并将自动提交设置为 false 以使用来自主题的消息并写入数据库。如果数据库已关闭,我需要停止容器处理轮询中的当前记录并且不执行下一步 poll()。我已经实施 DataSourceHealthIndicator,在检查数据库状态为 UP 后,我想再次重启我的容器以处理剩余的记录。

关于如何停止处理剩余记录并停止容器的任何建议,我已尝试使用 consumer.close()。但它并没有停止这个过程并且一直在抛出消费者已经关闭。

致电container.stop()。如果您正在使用 @KafkaListeners,请在侦听器容器注册表上调用 stop(),这将停止所有已注册的容器。

编辑

automatically stopping the container 的错误处理程序从 2.1 版本开始可用;在撰写本文时,当前版本为 2.2.3。