Kafka - 使用高级消费者实现延迟队列

Kafka - Delayed Queue implementation using high level consumer

想要使用高级消费者实现延迟消费者api

主要思想:

关于此实现的一些问题:

  1. 提交每个偏移量可能会减慢 ZK 速度
  2. 可以consumer.commitOffsets抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
  3. 等待很长时间没有提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)
  4. ZK 会话如何在不提交新偏移量的情况下保持活动状态? (设置一个hivezookeeper.session.timeout.ms可以在dead consumer中解析而不识别)
  5. 我还遗漏了其他问题吗?

谢谢!

解决此问题的一种方法是使用不同的主题,在该主题中推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟后处理,这将是相当简单的:

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

现在将尽快处理所有常规消息,而需要延迟的消息将放在另一个主题上。

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而是一直睡到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以根据延迟对主题进行划分(例如 24 小时、12 小时、6 小时)。如果延迟时间比它更动态,它就会变得有点复杂。您可以通过引入两个延迟主题来解决它。读取延迟主题 A 之外的所有消息,并处理其 delayTo 值为过去的所有消息。在其他人中,您只需找到最接近 delayTo 的那个,然后将它们放在主题 B 上。休眠直到最接近的一个应该被处理并以相反的方式执行所有操作,即处理来自主题 B 的消息并将不应该处理的一次放回主题 A.

回答您的具体问题(有些问题已在您问题的评论中解决)

  1. Commit each offset might slow ZK down

您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始可用的功能,请查看消费者配置中的 offsets.storage 属性)

  1. Can consumer.commitOffsets throw an exception? if yes, I will consume the same message twice (can solve with idempotent messages)

我相信它可以,例如,如果它不能与偏移存储通信。正如你所说,使用幂等消息可以解决这个问题。

  1. Problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout?)

除非消息本身的处理时间超过会话超时时间,否则上述解决方案不会有问题。

  1. How can ZK session keep-alive without commit new offsets? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognizing it)

再说一遍,您不需要设置很长的会话超时。

  1. Any other problems I'm missing?

总有 ;)

在你的情况下,我会建议另一条路线。

解决消费者主线程中的等待时间没有意义。这将是队列使用方式的反模式。从概念上讲,您需要尽可能快地处理消息并使队列保持低负载率。

相反,我会使用一个调度程序来为您需要延迟的每条消息安排作业。通过这种方式,您可以处理队列并创建将在预定义时间点触发的异步作业。

使用此技术的缺点是它对在内存中保存计划作业的 JVM 的状态敏感。如果该 JVM 失败,您将丢失计划的作业,并且您不知道该任务是否已执行。

虽然可以在群集环境中将其配置为 运行,但有调度程序实现,从而使您免受 JVM 崩溃的影响。

看看这个 java 调度框架:http://www.quartz-scheduler.org/

使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。 Kafka 可能不是您正在做的事情的正确设计选择

按时键控列表或其 redis 替代方案可能是最佳方法。

我们在一项任务中遇到了同样的问题。虽然最终在不使用延迟队列的情况下解决了这个问题,但是在探索解决方案时,我们发现最好的方法是使用 KafkaConsumer [=16= 提供的 pauseresume 功能].这种方法及其动机在这里得到了完美的描述:https://medium.com/naukri-engineering/retry-mechanism-and-delay-queues-in-apache-kafka-528a6524f722