您如何处理消息队列中出现乱序的消息?

How do you address messages coming out of order in a message queue?

我曾经在一次采访中被问到,你会如何处理消息队列中乱序的消息。已经有一段时间了,我还没有找到明确的答案,我想知道是否有该领域的专家可以帮助我回答它以解决我自己的好奇心。

我了解某些消息队列提供恰好一次和 FIFO 保证。我也知道流系统中事件时间和处理时间的概念。例如,在像 Kafka 这样的基于日志的消息队列中,由于存在偏移量和消息持久性(我可能是错的),混合排序可能不太可能发生。我还考虑过使用时间戳,要求每个消息发送者在发送消息之前记录消息的时间,但由于时钟偏差,这充满了不一致。

考虑到所有这些,我想知道一个地址如何在 AMQP、JMS 或 RabbitMQ 等传统消息传递系统中混淆排序,在这些系统中,可能有十几个物联网设备正在发送消息,而我作为消费者想要协调它们顺序正确。

我可以回答关于 Apache Kafka 的问题。 Apache Kafka 通过分区保证主题的严格顺序意味着每个分区都是按严格顺序附加的不可变消息序列。 因此,以防万一,多个分区消费者可能会使用来自多个分区的消息,而这些消息不能按严格顺序排列。我们可以考虑以下 2 个选项来实现严格的顺序。

  1. 如果按顺序查找 1 个生产者消息,每个主题仅使用 1 个分区。因此生产者将按顺序在同一分区上发布,消费者将按严格顺序消费。

  1. 生产者向多分区发布消息,因此在消费者组中使用多消费者,但使用分配给每个消费者的特定分区来消费来自特定分区的消息将保证每个消费者每个分区的严格顺序

如果您的系统正在使用队列,提供有序的消息保证,那么只需使用该通道(如 kakfa 的单分区,某些设置下的 AMQP)。 但是,如果您的系统使用的队列不提供 严格排序 那么一般的想法是客户端可以 单调 增加 [1] 数字(或时间戳)附加到它发送到队列的每条消息。这构成了生产者打算发送给其接收者的序列的基础。

如何获得单调递增的值:

使用时间戳: POSIX clock_gettime() 函数和 CLOCK_MONOTONIC[2] 提供了获取单调递增时间戳的选项,生产者可以使用它在每条消息上放置时间戳。当接收方看到接收到的消息的时间戳早于最新消息时,接收方可以识别出乱序消息。

使用序列号: 在发送每条消息之前,您可以简单地增加一个原子计数器并将计数器值附加到每条消息,以便接收者可以了解预期的顺序。这样就形成了严格递增的序列。方法与 Lamport 的逻辑时钟[3]非常相似,后者为生产者提供虚拟时钟。

在接收方处理乱序消息: 这几乎是特定于应用程序的,但通常当消息乱序到达时您有 2 个选项: a) 丢弃旧消息,例如在接收方必须显示股票最新价值的情况下。 b) 有缓冲区来重新排序序列,就像在 TCP 连接中一样(例如,zookeeper 使用 TCP 作为队列进行 FIFO 排序 [4-5])

工具: 如果您没有为消息添加时间戳,则将所有消息从生产者发送到 Apache kafka single partition in sequence,因为这将确保接收者可以按顺序接收消息。

如果您使用的消息系统不能保证按顺序交付(例如某些设置下的 AMQP[6]),那么您可以考虑为每条消息添加额外的单调递增 number/clock。

[1] https://en.wiktionary.org/wiki/monotonic_increasing#targetText=Adjective,contrast%20this%20with%20strictly%20increasing

[2]https://linux.die.net/man/2/clock_gettime

[3]https://en.wikipedia.org/wiki/Lamport_timestamps#Lamport's_logical_clock_in_distributed_systems

[4]https://cwiki.apache.org/confluence/download/attachments/24193445/zookeeper-internals.pdf?version=1&modificationDate=1295034038000&api=v2

[5]http://www.tcs.hut.fi/Studies/T-79.5001/reports/2012-deSouzaMedeiros.pdf

[6]RabbitMQ - Message order of delivery