RabbitMQ 主题交换消息排序

RabbitMQ topic exchange message ordering

RabbitMQ specification中可以找到:

Section 4.7 of the AMQP 0-9-1 core specification explains the conditions under which ordering is guaranteed: messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent. RabbitMQ offers stronger guarantees since release 2.7.0.

但是如果有像 Exchange 1 -> Exchange 2 -> Queue 1.

这样的绑定呢?

还能保证下单吗?

我们假设它但我们在我们的应用程序中发现情况可能并非如此。我们使用 spring-rabbit-2.1.6-RELEASE(它使用 amqp-client-5.4.3)。

发布者、绑定和消费者如下:

Client 1 publishes to Exchange 1 -> Exchange 2 -> Queue 1 - consumed by Client 2
                                 -> Queue 2 - consumed by Client 3

我们可以看到Client 1按以下顺序发布了3条消息:

但是Client 2和Client 3都按以下顺序收到消息:

编辑 1(Spring 配置)

对于发布者 (Client 1),使用了以下 XML 配置(没有在 rabbit 的 ConnectionFactory 上设置额外的属性):

<rabbit:connection-factory channel-cache-size="1" cache-mode="CHANNEL" id="respConnFactory" addresses="..." virtual-host="..." username="..." password="..." executor="connExec"/>
<!-- the executor has no meaning for such usingas mentioned by Gary -->

发布是通过以下方式完成的:

AmqpTemplate::send(String exchange, String routingKey, Message message)

专用线程中

Client 2 使用默认 spring 配置 SimpleMessageListenerContainer.

Client 3 实际上不是我们的应用程序,所以我不知道真正的设置。就是他们向我们报告了消息排序不正确的错误。

当然,我们记录的消息发布仍有可能存在一些错误。但我对它进行了三重检查 - 它来自单个线程,并且每条消息的自定义 header 中都有序列号,该序列号在 Client 1.

上正确递增

编辑 2

我做了 进一步分析 以找出错误邮件排序发生的频率。结果如下:

我在事件前后的 +-2 小时内记录了日志和数据(总共 4 小时)并且发送了 42706 条消息其中只有 3 个在 Client 2 上排序错误。所有 3 条消息都在 7 毫秒的间隔内发送。

然后我又随机拍了另一次window,时长14小时。已发送 531904 条消息,Client 2 正确的顺序 接收了所有消息 。平均消息速率约为每秒 11 条消息。

消息分布不均匀所以 7 毫秒内的 3 条消息没什么特别的 - 恰恰相反。通常在 3-5 毫秒内发送多条消息。

根据这个分析,我假设 rabbit 集群上发生了一些奇怪的事情。不幸的是,我没有它的日志了。

某种竞争条件的机会在我看来非常低.

谢谢,

弗兰克

Spring AMQP 使用通道缓存;在多线程环境中,不能保证同一个线程总是使用同一个通道;因此不能保证订购。

对于当前版本,解决方案是使用 scoped operations,这将保证一系列出版物将出现在同一频道并保证顺序。

在下一个版本(2.3,今年晚些时候可用)中,我们还添加了做同样事情的 ThreadChannelConnectionFactory

它又发生了,我们终于弄明白了。

一直以来都是兔子健康指标负责频道的重新创建,因此错误的订单排序。有一个定期调用健康端点的工作。

正如加里正确提到的:

Spring AMQP uses a cache for channels; in a multi-threaded environment, there is no guarantee that the same thread will always use the same channel; hence ordering is not guaranteed.

从不同的线程检查健康状态,它使用生产者的通道。

作为短期解决方案,这将起作用:

management.health.rabbit.enabled=false

如果生产者真的是单线程并且连接工厂按照描述设置,那么排序是有保证的。

另一个(也许是正确的)解决方案是创建单独的ConnectionFactory并且不使用自动配置进行兔子健康检查.

@Bean("rabbitHealthIndicator")
public HealthIndicator rabbitHealthIndicator(ConnectionFactory healthCheckConnectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(healthCheckConnectionFactory); // make sure it's a different connection factory than the one with guaranteed sorting
    return new RabbitHealthIndicator(rabbitTemplate);
}

成功了。

干杯,谢谢加里的帮助。

弗兰克