如何从 spring ListenerContainerIdleEvent 执行手动偏移确认
How to execute manual offset acknowledge from spring ListenerContainerIdleEvent
我有一个 Kafka 监听器,它实现了具有以下属性的确认消息监听器接口:
- ackMode - MANUAL_IMMEDIATE
- 空闲事件间隔 - 3 分钟
在侦听器上使用消息时,它决定是否通过 acknowledgment.acknowledge() 确认特定记录,并且它按预期工作。
此外,我有一个场景可以在 X 分钟后(如果没有消息到达)确认最后的偏移量编号(将其保存在内存中)。
为了克服这个要求,我决定使用根据我的配置每 3 分钟触发一次的 ListenerContainerIdleEvent。
我的问题是:
有什么方法可以将 Kafka 偏移量确认为空闲事件的触发器吗?空闲事件包含对 KafkaMessageListenerContainer 的引用,但它封装了持有 KafkaConsumer 的 ListenerConsumer。
空闲消息事件发送是否同步(与KafkaListenerConsumer同一个线程)?从代码来看,默认实现是 SimpleApplicationEventMulticaster,它在没有 TaskExecutor 的情况下进行初始化,因此它会在同一线程上调用侦听器。你能批准吗?
我正在使用 spring-kafka 1.3.9。
是的,只是保留对最后一个Acknowledgment
的引用,然后再次调用acknowledge()
。
是的,事件默认发布在消费者线程
即使事件是在不同的线程(多播器中的执行器)上发布的,它仍然应该工作,因为不是直接提交,提交将在消费者从轮询中醒来时排队并由其处理。
参见processAck()
中的逻辑。
在较新的版本中(从 2.0 开始),事件具有对消费者的引用,因此您可以直接与其交互(获取当前位置并再次提交),只要事件发布在消费者上线程。
我有一个 Kafka 监听器,它实现了具有以下属性的确认消息监听器接口:
- ackMode - MANUAL_IMMEDIATE
- 空闲事件间隔 - 3 分钟
在侦听器上使用消息时,它决定是否通过 acknowledgment.acknowledge() 确认特定记录,并且它按预期工作。
此外,我有一个场景可以在 X 分钟后(如果没有消息到达)确认最后的偏移量编号(将其保存在内存中)。 为了克服这个要求,我决定使用根据我的配置每 3 分钟触发一次的 ListenerContainerIdleEvent。
我的问题是:
有什么方法可以将 Kafka 偏移量确认为空闲事件的触发器吗?空闲事件包含对 KafkaMessageListenerContainer 的引用,但它封装了持有 KafkaConsumer 的 ListenerConsumer。
空闲消息事件发送是否同步(与KafkaListenerConsumer同一个线程)?从代码来看,默认实现是 SimpleApplicationEventMulticaster,它在没有 TaskExecutor 的情况下进行初始化,因此它会在同一线程上调用侦听器。你能批准吗?
我正在使用 spring-kafka 1.3.9。
是的,只是保留对最后一个
Acknowledgment
的引用,然后再次调用acknowledge()
。是的,事件默认发布在消费者线程
即使事件是在不同的线程(多播器中的执行器)上发布的,它仍然应该工作,因为不是直接提交,提交将在消费者从轮询中醒来时排队并由其处理。
参见processAck()
中的逻辑。
在较新的版本中(从 2.0 开始),事件具有对消费者的引用,因此您可以直接与其交互(获取当前位置并再次提交),只要事件发布在消费者上线程。