如何在 Spring Kafka 中确认 container.stop 上的特定偏移量?

How to acknowledge specific offset on container.stop in Spring Kafka?

基本上已经为登录用户打开 WebSocket 会话 id = 123 并且侦听器容器已创建并启动并记录来自主题 notification.user.123 已发送到客户端,但从未真正调用 acknowledgment.acknowledge();

最新查看的通知和相关偏移量已知。我不想存储和管理该数据并寻求抵消。当需要关闭 WebSocket 会话并停止侦听器容器时,我需要确认它。因此它将作为消费者读取的最新偏移量存储在代理中的某个位置。新的 WebSocket 会话和侦听器容器将自然地从该偏移量开始。

到目前为止,我尝试过即兴发挥 https://docs.spring.io/spring-kafka/reference/html/#seek

,但没有成功
private class MyListener extends AbstractConsumerSeekAware implements AcknowledgingConsumerAwareMessageListener<String, Notification> {

我是否需要查看一些侦听器容器生命周期和 atStop 事件?

如果您出于某种原因必须使用手动确认,因为您正在接收所有 topic/partition/offset 信息,您可以通过捕获 ConsumerStoppoingEvents 来提交消费者的偏移量。

The suggested solution is working perfectly with @KafkaListener and @EventListener but there are no events for programmatically created and started KafkaMessageListenerContainer.

要在 Spring 不管理容器时向事件侦听器获取事件,请调用 setApplicationEventPublisher(applictionContext)(应用程序发布者是应用程序上下文,当 ApplicationEventListenerAware bean 由Spring).

或者您可以简单地注入您自己的 ApplicationEventPublisher 实现并在那里处理事件。