RabbitMQ 和通道 Java 线程安全

RabbitMQ and channels Java thread safety

在本指南中 https://www.rabbitmq.com/api-guide.html RabbitMQ 人员声明:

Channels and Concurrency Considerations (Thread Safety)

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.

线程安全非常重要,所以我尽量努力,但问题是:

我有这个应用程序可以接收来自 Rabbit 的消息。当收到一条消息时,它会对其进行处理,然后在完成后进行确认。在具有 2 个线程的固定线程池中,应用程序只能同时处理 2 个项目。 Rabbit 的 QOS 预取设置为 2,因为我不想为应用程序提供超过其在时间范围内可以处理的内容。

现在,我的消费者的 handleDelivery 执行以下操作:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

此时,您已经知道 TestWrapperThread 执行 channel.basicAck(deliveryTag, false); 调用作为最后一个操作。

根据我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,这种行为可能会把事情搞砸。但是那我该怎么办呢?我的意思是,我有一些想法,但它们肯定会让一切变得更复杂,我想弄清楚是否真的有必要。

提前致谢

我想您只为您的消费者使用 Channel,而不是为发布等其他操作使用

在你的情况下,唯一的潜在问题是:

channel.basicAck(deliveryTag, false);

因为你跨两个线程调用这个,顺便说一句,这个操作是安全的,如果你看到 java 代码:

classChannelN.java调用:

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
{
   transmit(new Basic.Ack(deliveryTag, multiple));
}

see github code for ChannelN.java

AMQChannel 中的 transmit 方法使用:

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));
   }
}

_channelMutex 是一个 protected final Object _channelMutex = new Object();

使用 class 创建。 see github code for AMQChannel.java

编辑

正如您在官方文档中看到的那样,"some" 操作是线程安全的,现在不清楚是哪些。 我研究了代码,我认为跨多个线程调用ACK没有问题。

希望对您有所帮助。

EDIT2 我还添加了 Nicolas 的评论:

请注意,来自多个线程的消费 (basicConsume) 和确认是一种常见的 rabbitmq 模式,已被 java 客户端使用。

所以您可以放心使用。