为延迟队列实现处理 spring kafka 的轮询间隔的正确方法是什么?
What is the right way to handle poll interval for spring kafka for a delay queue implementation?
我正在用 kafka 实现一种延迟队列。 spring kafka listener 容器针对某个主题(例如 t1)收到的每条消息都应该延迟特定时间(例如 d 分钟),然后将消息发送回另一个主题(例如 t2 ).
目前我在 spring kafka 侦听器容器方法 (AcknowledgingConsumerAwareMessageListener):
中执行此操作
- 收到来自 t1 的消息
- 暂停侦听器容器
- 如果需要,睡 d 分钟
- 恢复侦听器容器
- 将消息发送到 t2
我知道心跳线程是一个不同的线程,不会受到上述步骤的影响,但轮询发生在与处理相同的线程中,并且在处理记录之后 。我已将我的 @KafkaListener 属性设置为 "max.poll.interval.ms=2xd",这样它就不会超时,但我从 KafkaEventListener 获得了 NonResponsiveConsumerEvent(带有 timeSinceLastPoll)。即使我没有为 @KafkaListener 属性设置 max.poll.interval.ms,我仍然会得到相同的 NonResponsiveConsumerEvent。在这两种情况下,消息都只处理一次并发送到 t2。
问题
- 如果侦听器容器暂停时 max.poll.interval.ms 内没有进行轮询,结果是什么?当容器没有暂停时怎么办? (我已将消费者配置为手动确认)
- 我是否应该生成一个单独的线程来休眠和恢复容器,从而释放容器处理线程以进行轮询?重要吗?
版本:Spring Boot 2.1.8,Spring Kafka 2.2.8
sleep for d minutes if required
您不能 "sleep" 消费者线程超过 max.poll.interval.ms
。
暂停容器的全部意义在于它会继续 poll
(但在恢复之前永远不会获得任何新记录)。
如果你真的让听者睡觉,那么暂停是没有意义的;你只需要适当增加max.poll.interval.ms
。
我正在用 kafka 实现一种延迟队列。 spring kafka listener 容器针对某个主题(例如 t1)收到的每条消息都应该延迟特定时间(例如 d 分钟),然后将消息发送回另一个主题(例如 t2 ). 目前我在 spring kafka 侦听器容器方法 (AcknowledgingConsumerAwareMessageListener):
中执行此操作- 收到来自 t1 的消息
- 暂停侦听器容器
- 如果需要,睡 d 分钟
- 恢复侦听器容器
- 将消息发送到 t2
我知道心跳线程是一个不同的线程,不会受到上述步骤的影响,但轮询发生在与处理相同的线程中,并且在处理记录之后
问题
- 如果侦听器容器暂停时 max.poll.interval.ms 内没有进行轮询,结果是什么?当容器没有暂停时怎么办? (我已将消费者配置为手动确认)
- 我是否应该生成一个单独的线程来休眠和恢复容器,从而释放容器处理线程以进行轮询?重要吗?
版本:Spring Boot 2.1.8,Spring Kafka 2.2.8
sleep for d minutes if required
您不能 "sleep" 消费者线程超过 max.poll.interval.ms
。
暂停容器的全部意义在于它会继续 poll
(但在恢复之前永远不会获得任何新记录)。
如果你真的让听者睡觉,那么暂停是没有意义的;你只需要适当增加max.poll.interval.ms
。