RabbitMQ and channels Java thread safety

在本指南中 https://www.rabbitmq.com/api-guide.html RabbitMQ 人员声明:

Channels and Concurrency Considerations (Thread Safety)

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.


我有这个应用程序可以接收来自 Rabbit 的消息。当收到一条消息时,它会对其进行处理,然后在完成后进行确认。在具有 2 个线程的固定线程池中,应用程序只能同时处理 2 个项目。 Rabbit 的 QOS 预取设置为 2,因为我不想为应用程序提供超过其在时间范围内可以处理的内容。

现在,我的消费者的 handleDelivery 执行以下操作:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

此时,您已经知道 TestWrapperThread 执行 channel.basicAck(deliveryTag, false); 调用作为最后一个操作。



我想您只为您的消费者使用 Channel,而不是为发布等其他操作使用


channel.basicAck(deliveryTag, false);

因为你跨两个线程调用这个,顺便说一句,这个操作是安全的,如果你看到 java 代码:


public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
   transmit(new Basic.Ack(deliveryTag, multiple));

see github code for ChannelN.java

AMQChannel 中的 transmit 方法使用:

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));

_channelMutex 是一个 protected final Object _channelMutex = new Object();

使用 class 创建。 see github code for AMQChannel.java


正如您在官方文档中看到的那样,"some" 操作是线程安全的,现在不清楚是哪些。 我研究了代码,我认为跨多个线程调用ACK没有问题。


EDIT2 我还添加了 Nicolas 的评论:

请注意,来自多个线程的消费 (basicConsume) 和确认是一种常见的 rabbitmq 模式,已被 java 客户端使用。
