Spring amqp 和 correlatinId
Spring amqp and the correlatinId
我对 Spring AMQP 和 AMQP 消息的 correlationId 有一些疑问。
我有一个项目,其中有两个 queues(“queue.A”和“queue.B”)和一个 MessageListener:
public class ServerHandlerQueueA implements MessageListener {
@Override
public void onMessage(Message msg)
public class ServerHandlerQueueB implements MessageListener {
@Override
public void onMessage(Message msg)
在某些情况下,当我在“queue.A”中收到消息时,我必须将其重定向到“queue.B”:
rabbitTemplate.convertAndSend(routingkey, msg, new MessagePostProcessor()
{ …});
在所有情况下,我都会使用以下方式向客户端发送响应:
String routingkey = msg.getMessageProperties().getReplyTo();
rabbitTemplate.convertAndSend(routingkey, respuesta, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message msg) throws AmqpException
{….}
});
如果我在客户端使用 Spring AMQP,这会正常工作:
Object _response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyManagement, msg,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{….}
});
但是如果我使用 java 客户端(在客户端):
RpcClient _rpcClient = new RpcClient(channel, exchangeName, routingKey);
Response _response = _rpcClient.doCall(new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(1)
.userId("myUser")
.appId("MyApp")
.replyTo(replyQueueName)
.correlationId(corrId)
.type("NewOrder")
.build(),
messageBodyBytes);
我总是在以下位置收到 NullPointerException:
com.rabbitmq.client.RpcClient.handleDelivery(RpcClient.java:195)
我认为这是因为 correlationId 处理。当我使用 Spring AMQP 发送消息时,我可以在消费者中看到“spring_listener_return_correlation”和“spring_request_return_correlation”headers,但是“correlationId”属性始终为空。
我怎样才能使它与纯 java 客户端和 Spring AMQP 兼容?我做错了什么?
谢谢!
------ 编辑 ----------
我已经升级到 Spring AMQP 1.7.4 版本。
我发送这样的消息:
Object respuesta = getRabbitOperations().convertSendAndReceive(requestExchange, routingKey, _object,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{
message.getMessageProperties().setUserId(“myUser”);
message.getMessageProperties().setType(“myType”);
message.getMessageProperties().setAppId("myApp");
message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
message.getMessageProperties().setRedelivered(false);
return message;
}
});
在我的服务器上:
@Override
public void onMessage(Message msg)
{
MessageProperties mp = msg.getMessageProperties();
Gson __gson = new Gson();
String _stringMP = __gson.toJson(mp);
System.out.println("MessageProperties:\n" + _stringMP);
}
而且我认为问题是我总是得到 correlationId null:
{"headers":{"spring_listener_return_correlation":"49bd0a84-9abb-4719-b8a7-8668a4a77f32","spring_request_return_correlation":"32","__TypeId__":"MyType"},"messageId":"32-MyType","appId":"myApp","type":"MyType","replyTo":"amq.rabbitmq.reply-to.g2dkABByYWJiaXRATkRFUy1QQzAyAAAsMwAAAAgD.ia4+GgHgoeBnajbHxOgW+w\u003d\u003d","contentType":"application/json","contentEncoding":"UTF-8","contentLength":0,"contentLengthSet":false,"priority":0,"redelivered":false,"receivedExchange":"requestExchange","receivedRoutingKey":"inquiry","receivedUserId":"myUser",
"deliveryTag":5,"deliveryTagSet":true,"messageCount":0,"consumerTag":"amq.ctag-4H_P9CbWYZMML-QsmyaQYQ","consumerQueue":"inquiryQueue","receivedDeliveryMode":"NON_PERSISTENT"}
如果我使用 Java 客户端,我可以看到 correlationId:
{"headers":{},"appId":"XBID","type":"MyOrders","correlationId":[49], ….
------------ 编辑 2 -------------------------- ------
我试过:
getRabbitOperations().convertAndSend(requestExchange, routingKeyInquiry,
_object,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{
message.getMessageProperties().setUserId(“myUser”);
message.getMessageProperties().setType(“myType”);
message.getMessageProperties().setAppId("myApp");
message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
message.getMessageProperties().setRedelivered(false);
message.getMessageProperties().setCorrelationIdString(UUID.randomUUID().toString());
return message;
}
});
但是 "correlationId" 在服务器端总是空的。
您使用的是什么版本?
return相关头与correlationId无关;它们用于关联 returned(强制)请求和回复。
只要将queue.A
消息中的correlationId
和replyTo
复制到queue.B
消息中,应该就可以了。
如果您无法弄清楚,post 从某处调试所有 3 个服务器的日志。
编辑
这对我来说很好...
@SpringBootApplication
public class So46316261Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So46316261Application.class, args).close();
}
@Autowired
private RabbitTemplate template;
@Override
public void run(String... arg0) throws Exception {
Object reply = this.template.convertSendAndReceive("queue.A", "foo");
System.out.println(reply);
Connection conn = this.template.getConnectionFactory().createConnection();
Channel channel = conn.createChannel(false);
RpcClient client = new RpcClient(channel, "", "queue.A");
Response response = client.doCall(new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("guest")
.appId("MyApp")
.replyTo("amq.rabbitmq.reply-to")
.correlationId("bar")
.type("NewOrder")
.build(),
"foo".getBytes());
System.out.println(new String(response.getBody()));
channel.close();
conn.close();
}
@Bean
public Queue queueA() {
return new Queue("queue.A");
}
@Bean
public Queue queueB() {
return new Queue("queue.B");
}
@RabbitListener(queues = "queue.A")
public void listen(Message in) {
System.out.println(in);
this.template.send("queue.B", in);
}
@RabbitListener(queues = "queue.B")
public String listenB(Message in) {
System.out.println(in);
return "FOO";
}
}
(Body:'foo' MessageProperties [headers={}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACyAAAAAAB.hp0xZxgVpXcuj9+5QkcOOw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=1, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO
(Body:'foo' MessageProperties [headers={}, appId=MyApp, type=NewOrder, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACzAAAAAAB.okm02YXf0s0HdqZynVIn2w==, contentType=text/plain, contentLength=0, priority=1, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=2, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO
我对 Spring AMQP 和 AMQP 消息的 correlationId 有一些疑问。 我有一个项目,其中有两个 queues(“queue.A”和“queue.B”)和一个 MessageListener:
public class ServerHandlerQueueA implements MessageListener {
@Override
public void onMessage(Message msg)
public class ServerHandlerQueueB implements MessageListener {
@Override
public void onMessage(Message msg)
在某些情况下,当我在“queue.A”中收到消息时,我必须将其重定向到“queue.B”:
rabbitTemplate.convertAndSend(routingkey, msg, new MessagePostProcessor()
{ …});
在所有情况下,我都会使用以下方式向客户端发送响应:
String routingkey = msg.getMessageProperties().getReplyTo();
rabbitTemplate.convertAndSend(routingkey, respuesta, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message msg) throws AmqpException
{….}
});
如果我在客户端使用 Spring AMQP,这会正常工作:
Object _response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyManagement, msg,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{….}
});
但是如果我使用 java 客户端(在客户端):
RpcClient _rpcClient = new RpcClient(channel, exchangeName, routingKey);
Response _response = _rpcClient.doCall(new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(1)
.userId("myUser")
.appId("MyApp")
.replyTo(replyQueueName)
.correlationId(corrId)
.type("NewOrder")
.build(),
messageBodyBytes);
我总是在以下位置收到 NullPointerException:
com.rabbitmq.client.RpcClient.handleDelivery(RpcClient.java:195)
我认为这是因为 correlationId 处理。当我使用 Spring AMQP 发送消息时,我可以在消费者中看到“spring_listener_return_correlation”和“spring_request_return_correlation”headers,但是“correlationId”属性始终为空。 我怎样才能使它与纯 java 客户端和 Spring AMQP 兼容?我做错了什么? 谢谢!
------ 编辑 ---------- 我已经升级到 Spring AMQP 1.7.4 版本。 我发送这样的消息:
Object respuesta = getRabbitOperations().convertSendAndReceive(requestExchange, routingKey, _object,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{
message.getMessageProperties().setUserId(“myUser”);
message.getMessageProperties().setType(“myType”);
message.getMessageProperties().setAppId("myApp");
message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
message.getMessageProperties().setRedelivered(false);
return message;
}
});
在我的服务器上:
@Override
public void onMessage(Message msg)
{
MessageProperties mp = msg.getMessageProperties();
Gson __gson = new Gson();
String _stringMP = __gson.toJson(mp);
System.out.println("MessageProperties:\n" + _stringMP);
}
而且我认为问题是我总是得到 correlationId null:
{"headers":{"spring_listener_return_correlation":"49bd0a84-9abb-4719-b8a7-8668a4a77f32","spring_request_return_correlation":"32","__TypeId__":"MyType"},"messageId":"32-MyType","appId":"myApp","type":"MyType","replyTo":"amq.rabbitmq.reply-to.g2dkABByYWJiaXRATkRFUy1QQzAyAAAsMwAAAAgD.ia4+GgHgoeBnajbHxOgW+w\u003d\u003d","contentType":"application/json","contentEncoding":"UTF-8","contentLength":0,"contentLengthSet":false,"priority":0,"redelivered":false,"receivedExchange":"requestExchange","receivedRoutingKey":"inquiry","receivedUserId":"myUser",
"deliveryTag":5,"deliveryTagSet":true,"messageCount":0,"consumerTag":"amq.ctag-4H_P9CbWYZMML-QsmyaQYQ","consumerQueue":"inquiryQueue","receivedDeliveryMode":"NON_PERSISTENT"}
如果我使用 Java 客户端,我可以看到 correlationId:
{"headers":{},"appId":"XBID","type":"MyOrders","correlationId":[49], ….
------------ 编辑 2 -------------------------- ------
我试过:
getRabbitOperations().convertAndSend(requestExchange, routingKeyInquiry,
_object,
new MessagePostProcessor()
{
public Message postProcessMessage(Message message) throws AmqpException
{
message.getMessageProperties().setUserId(“myUser”);
message.getMessageProperties().setType(“myType”);
message.getMessageProperties().setAppId("myApp");
message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
message.getMessageProperties().setRedelivered(false);
message.getMessageProperties().setCorrelationIdString(UUID.randomUUID().toString());
return message;
}
});
但是 "correlationId" 在服务器端总是空的。
您使用的是什么版本?
return相关头与correlationId无关;它们用于关联 returned(强制)请求和回复。
只要将queue.A
消息中的correlationId
和replyTo
复制到queue.B
消息中,应该就可以了。
如果您无法弄清楚,post 从某处调试所有 3 个服务器的日志。
编辑
这对我来说很好...
@SpringBootApplication
public class So46316261Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So46316261Application.class, args).close();
}
@Autowired
private RabbitTemplate template;
@Override
public void run(String... arg0) throws Exception {
Object reply = this.template.convertSendAndReceive("queue.A", "foo");
System.out.println(reply);
Connection conn = this.template.getConnectionFactory().createConnection();
Channel channel = conn.createChannel(false);
RpcClient client = new RpcClient(channel, "", "queue.A");
Response response = client.doCall(new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("guest")
.appId("MyApp")
.replyTo("amq.rabbitmq.reply-to")
.correlationId("bar")
.type("NewOrder")
.build(),
"foo".getBytes());
System.out.println(new String(response.getBody()));
channel.close();
conn.close();
}
@Bean
public Queue queueA() {
return new Queue("queue.A");
}
@Bean
public Queue queueB() {
return new Queue("queue.B");
}
@RabbitListener(queues = "queue.A")
public void listen(Message in) {
System.out.println(in);
this.template.send("queue.B", in);
}
@RabbitListener(queues = "queue.B")
public String listenB(Message in) {
System.out.println(in);
return "FOO";
}
}
(Body:'foo' MessageProperties [headers={}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACyAAAAAAB.hp0xZxgVpXcuj9+5QkcOOw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=1, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO
(Body:'foo' MessageProperties [headers={}, appId=MyApp, type=NewOrder, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACzAAAAAAB.okm02YXf0s0HdqZynVIn2w==, contentType=text/plain, contentLength=0, priority=1, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=2, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO