如何从 spring ListenerContainerIdleEvent 执行手动偏移确认

How to execute manual offset acknowledge from spring ListenerContainerIdleEvent

我有一个 Kafka 监听器,它实现了具有以下属性的确认消息监听器接口:

  1. ackMode - MANUAL_IMMEDIATE
  2. 空闲事件间隔 - 3 分钟

在侦听器上使用消息时,它决定是否通过 acknowledgment.acknowledge() 确认特定记录,并且它按预期工作。

此外,我有一个场景可以在 X 分钟后(如果没有消息到达)确认最后的偏移量编号(将其保存在内存中)。 为了克服这个要求,我决定使用根据我的配置每 3 分钟触发一次的 ListenerContainerIdleEvent。

我的问题是:

  1. 有什么方法可以将 Kafka 偏移量确认为空闲事件的触发器吗?空闲事件包含对 KafkaMessageListenerContainer 的引用,但它封装了持有 KafkaConsumer 的 ListenerConsumer。

  2. 空闲消息事件发送是否同步(与KafkaListenerConsumer同一个线程)?从代码来看,默认实现是 SimpleApplicationEventMulticaster,它在没有 TaskExecutor 的情况下进行初始化,因此它会在同一线程上调用侦听器。你能批准吗?

我正在使用 spring-kafka 1.3.9。

  1. 是的,只是保留对最后一个Acknowledgment的引用,然后再次调用acknowledge()

  2. 是的,事件默认发布在消费者线程

即使事件是在不同的线程(多播器中的执行器)上发布的,它仍然应该工作,因为不是直接提交,提交将在消费者从轮询中醒来时排队并由其处理。

参见processAck()中的逻辑。

在较新的版本中(从 2.0 开始),事件具有对消费者的引用,因此您可以直接与其交互(获取当前位置并再次提交),只要事件发布在消费者上线程。