如何在 Spring Kafka 中确认 container.stop 上的特定偏移量?
How to acknowledge specific offset on container.stop in Spring Kafka?
基本上已经为登录用户打开 WebSocket 会话 id = 123 并且侦听器容器已创建并启动并记录来自主题 notification.user.123 已发送到客户端,但从未真正调用 acknowledgment.acknowledge();
。
最新查看的通知和相关偏移量已知。我不想存储和管理该数据并寻求抵消。当需要关闭 WebSocket 会话并停止侦听器容器时,我需要确认它。因此它将作为消费者读取的最新偏移量存储在代理中的某个位置。新的 WebSocket 会话和侦听器容器将自然地从该偏移量开始。
到目前为止,我尝试过即兴发挥 https://docs.spring.io/spring-kafka/reference/html/#seek 和
,但没有成功
private class MyListener extends AbstractConsumerSeekAware implements AcknowledgingConsumerAwareMessageListener<String, Notification> {
我是否需要查看一些侦听器容器生命周期和 atStop 事件?
如果您出于某种原因必须使用手动确认,因为您正在接收所有 topic/partition/offset 信息,您可以通过捕获 ConsumerStoppoingEvent
s 来提交消费者的偏移量。
The suggested solution is working perfectly with @KafkaListener
and @EventListener
but there are no events for programmatically created and started KafkaMessageListenerContainer.
要在 Spring 不管理容器时向事件侦听器获取事件,请调用 setApplicationEventPublisher(applictionContext)
(应用程序发布者是应用程序上下文,当 ApplicationEventListenerAware
bean 由Spring).
或者您可以简单地注入您自己的 ApplicationEventPublisher
实现并在那里处理事件。
基本上已经为登录用户打开 WebSocket 会话 id = 123 并且侦听器容器已创建并启动并记录来自主题 notification.user.123 已发送到客户端,但从未真正调用 acknowledgment.acknowledge();
。
最新查看的通知和相关偏移量已知。我不想存储和管理该数据并寻求抵消。当需要关闭 WebSocket 会话并停止侦听器容器时,我需要确认它。因此它将作为消费者读取的最新偏移量存储在代理中的某个位置。新的 WebSocket 会话和侦听器容器将自然地从该偏移量开始。
到目前为止,我尝试过即兴发挥 https://docs.spring.io/spring-kafka/reference/html/#seek 和
,但没有成功private class MyListener extends AbstractConsumerSeekAware implements AcknowledgingConsumerAwareMessageListener<String, Notification> {
我是否需要查看一些侦听器容器生命周期和 atStop 事件?
如果您出于某种原因必须使用手动确认,因为您正在接收所有 topic/partition/offset 信息,您可以通过捕获 ConsumerStoppoingEvent
s 来提交消费者的偏移量。
The suggested solution is working perfectly with
@KafkaListener
and@EventListener
but there are no events for programmatically created and started KafkaMessageListenerContainer.
要在 Spring 不管理容器时向事件侦听器获取事件,请调用 setApplicationEventPublisher(applictionContext)
(应用程序发布者是应用程序上下文,当 ApplicationEventListenerAware
bean 由Spring).
或者您可以简单地注入您自己的 ApplicationEventPublisher
实现并在那里处理事件。