Spring amqp 和 correlatinId

Spring amqp and the correlatinId

我对 Spring AMQP 和 AMQP 消息的 correlationId 有一些疑问。 我有一个项目,其中有两个 queues(“queue.A”和“queue.B”)和一个 MessageListener:

public class ServerHandlerQueueA implements MessageListener {

    @Override
    public void onMessage(Message msg)


public class ServerHandlerQueueB implements MessageListener {

    @Override
    public void onMessage(Message msg)

在某些情况下,当我在“queue.A”中收到消息时,我必须将其重定向到“queue.B”:

rabbitTemplate.convertAndSend(routingkey, msg, new MessagePostProcessor() 
    { …});

在所有情况下,我都会使用以下方式向客户端发送响应:

String routingkey = msg.getMessageProperties().getReplyTo();
rabbitTemplate.convertAndSend(routingkey, respuesta, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message msg) throws AmqpException 
{….}
});

如果我在客户端使用 Spring AMQP,这会正常工作:

Object _response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyManagement, msg,
new MessagePostProcessor() 
        {
            public Message postProcessMessage(Message message) throws AmqpException 
            {….}
        });

但是如果我使用 java 客户端(在客户端):

RpcClient _rpcClient = new RpcClient(channel, exchangeName, routingKey);
        Response _response = _rpcClient.doCall(new AMQP.BasicProperties.Builder()
                   .contentType("application/json")
                   .deliveryMode(2)
                   .priority(1)
                   .userId("myUser")
                   .appId("MyApp")
                   .replyTo(replyQueueName)
                   .correlationId(corrId)
                   .type("NewOrder")
                   .build(), 
                   messageBodyBytes);

我总是在以下位置收到 NullPointerException:

com.rabbitmq.client.RpcClient.handleDelivery(RpcClient.java:195)

我认为这是因为 correlationId 处理。当我使用 Spring AMQP 发送消息时,我可以在消费者中看到“spring_listener_return_correlation”和“spring_request_return_correlation”headers,但是“correlationId”属性始终为空。 我怎样才能使它与纯 java 客户端和 Spring AMQP 兼容?我做错了什么? 谢谢!

------ 编辑 ---------- 我已经升级到 Spring AMQP 1.7.4 版本。 我发送这样的消息:

Object respuesta = getRabbitOperations().convertSendAndReceive(requestExchange, routingKey, _object, 
            new MessagePostProcessor() 
            {
                public Message postProcessMessage(Message message) throws AmqpException 
                {
                    message.getMessageProperties().setUserId(“myUser”);
                    message.getMessageProperties().setType(“myType”);
                    message.getMessageProperties().setAppId("myApp");
                    message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    message.getMessageProperties().setRedelivered(false);

                    return message;
                }
            });

在我的服务器上:

@Override
    public void onMessage(Message msg) 
    {
        MessageProperties mp = msg.getMessageProperties();
Gson __gson = new Gson();
                    String _stringMP = __gson.toJson(mp);
                    System.out.println("MessageProperties:\n" + _stringMP);
}

而且我认为问题是我总是得到 correlationId null:

{"headers":{"spring_listener_return_correlation":"49bd0a84-9abb-4719-b8a7-8668a4a77f32","spring_request_return_correlation":"32","__TypeId__":"MyType"},"messageId":"32-MyType","appId":"myApp","type":"MyType","replyTo":"amq.rabbitmq.reply-to.g2dkABByYWJiaXRATkRFUy1QQzAyAAAsMwAAAAgD.ia4+GgHgoeBnajbHxOgW+w\u003d\u003d","contentType":"application/json","contentEncoding":"UTF-8","contentLength":0,"contentLengthSet":false,"priority":0,"redelivered":false,"receivedExchange":"requestExchange","receivedRoutingKey":"inquiry","receivedUserId":"myUser",
"deliveryTag":5,"deliveryTagSet":true,"messageCount":0,"consumerTag":"amq.ctag-4H_P9CbWYZMML-QsmyaQYQ","consumerQueue":"inquiryQueue","receivedDeliveryMode":"NON_PERSISTENT"}

如果我使用 Java 客户端,我可以看到 correlationId:

{"headers":{},"appId":"XBID","type":"MyOrders","correlationId":[49], ….

------------ 编辑 2 -------------------------- ------
我试过:

getRabbitOperations().convertAndSend(requestExchange, routingKeyInquiry, 
                _object, 
                new MessagePostProcessor() 
                {
                    public Message postProcessMessage(Message message) throws AmqpException 
                    {
                        message.getMessageProperties().setUserId(“myUser”);
                        message.getMessageProperties().setType(“myType”);
                        message.getMessageProperties().setAppId("myApp");
                        message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                        message.getMessageProperties().setRedelivered(false);
                        message.getMessageProperties().setCorrelationIdString(UUID.randomUUID().toString());
                        return message;
                    }
                });

但是 "correlationId" 在服务器端总是空的。

您使用的是什么版本?

return相关头与correlationId无关;它们用于关联 returned(强制)请求和回复。

只要将queue.A消息中的correlationIdreplyTo复制到queue.B消息中,应该就可以了。

如果您无法弄清楚,post 从某处调试所有 3 个服务器的日志。

编辑

这对我来说很好...

@SpringBootApplication
public class So46316261Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So46316261Application.class, args).close();
    }

    @Autowired
    private RabbitTemplate template;

    @Override
    public void run(String... arg0) throws Exception {
        Object reply = this.template.convertSendAndReceive("queue.A", "foo");
        System.out.println(reply);
        Connection conn = this.template.getConnectionFactory().createConnection();
        Channel channel = conn.createChannel(false);
        RpcClient client = new RpcClient(channel, "", "queue.A");
        Response response = client.doCall(new AMQP.BasicProperties.Builder()
                .contentType("text/plain")
                .deliveryMode(2)
                .priority(1)
                .userId("guest")
                .appId("MyApp")
                .replyTo("amq.rabbitmq.reply-to")
                .correlationId("bar")
                .type("NewOrder")
                .build(),
                "foo".getBytes());
        System.out.println(new String(response.getBody()));
        channel.close();
        conn.close();
    }

    @Bean
    public Queue queueA() {
        return new Queue("queue.A");
    }

    @Bean
    public Queue queueB() {
        return new Queue("queue.B");
    }

    @RabbitListener(queues = "queue.A")
    public void listen(Message in) {
        System.out.println(in);
        this.template.send("queue.B", in);
    }

    @RabbitListener(queues = "queue.B")
    public String listenB(Message in) {
        System.out.println(in);
        return "FOO";
    }

}


(Body:'foo' MessageProperties [headers={}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACyAAAAAAB.hp0xZxgVpXcuj9+5QkcOOw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=1, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO
(Body:'foo' MessageProperties [headers={}, appId=MyApp, type=NewOrder, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACzAAAAAAB.okm02YXf0s0HdqZynVIn2w==, contentType=text/plain, contentLength=0, priority=1, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=2, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO