如何使用JmsTemplate.sendAndReceive
How to use JmsTemplate.sendAndReceive
我想从 jmsTemplate.sendAndReceive 获得同步响应:
Message responseMessage = producer.produceAndReceive(gzip, mestype, uploadFile.getName(), uploadFile.getAbsolutePath());
它在另一个 class:
中调用 produceAndReceive
@Override
public Message produceAndReceive(final byte[] data, final String type, final String name, final String archivePath) {
jmsTemplate.setReceiveTimeout(20000);
return jmsTemplate.sendAndReceive(SAPPI_EXPORT_QUEUE, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
String msgId = UUIDGen.getUUID();
BytesMessage message = session.createBytesMessage();
message.writeBytes(data);
message.setStringProperty(ISapProducer.IDOC_TYPE, type);
message.setStringProperty(ISapProducer.ORIGIN_FILE_NAME, name);
message.setStringProperty(ISapProducer.MESSAGE_ID, msgId);
message.setStringProperty(ISapProducer.ARCHIVE_PATH, archivePath);
message.setJMSReplyTo(session.createTemporaryQueue());
message.setJMSCorrelationID(msgId);
return message;
}
});
}
在这一步之后,我想该消息已经放入队列中。我有 @JmsListener 方法,'listen to' 这个队列:
@Override
@JmsListener(destination = "myqueue.export")
public void consume(final Message message) throws ServerException {
// some logic here
final HttpStatus httpStatus = client.send(gzip, idocType, documentFileName, messageId, archivePath);
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message responseMsg = session.createTextMessage(httpStatus.toString());
responseMsg.setJMSCorrelationID(message.getJMSCorrelationID());
return responseMsg;
}
});
// some logic here
}
在这里,我向远程系统发送 http 请求,并尝试在 sendAndReceive() 方法的响应消息中使用 httpStatus。但在 responseMessage 中始终为 null。而且看起来它是异步工作的。
我该如何解决?
适合我...
@SpringBootApplication
public class So53506177Application {
public static void main(String[] args) {
SpringApplication.run(So53506177Application.class, args);
}
private final SimpleMessageConverter converter = new SimpleMessageConverter();
@Bean
public ApplicationRunner runner(JmsTemplate jmsTemplate) {
return args -> {
jmsTemplate.setReceiveTimeout(20000);
Message received = jmsTemplate.sendAndReceive("foo", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
String msgId = "foo";
TextMessage message = session.createTextMessage("foo");
message.setJMSCorrelationID(msgId);
return message;
}
});
System.out.println("Reply: " + this.converter.fromMessage(received));
};
}
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "foo")
public void consume(final Message message) throws Exception {
System.out.println("Received: " + this.converter.fromMessage(message));
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message responseMsg = session.createTextMessage("bar");
responseMsg.setJMSCorrelationID(message.getJMSCorrelationID());
return responseMsg;
}
});
}
}
和
Received: foo
Reply: bar
但是,虽然它不会影响结果,但您不应创建自己的 replyTo
- 模板会在 MessageCreator
退出(并从中使用)后创建自己的。完成后它也会删除它:
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
你也可以简化你的听众:
@JmsListener(destination = "foo")
public String consume(final Message message) throws Exception {
System.out.println("Received: " + this.converter.fromMessage(message));
return "bar";
}
我想从 jmsTemplate.sendAndReceive 获得同步响应:
Message responseMessage = producer.produceAndReceive(gzip, mestype, uploadFile.getName(), uploadFile.getAbsolutePath());
它在另一个 class:
中调用 produceAndReceive @Override
public Message produceAndReceive(final byte[] data, final String type, final String name, final String archivePath) {
jmsTemplate.setReceiveTimeout(20000);
return jmsTemplate.sendAndReceive(SAPPI_EXPORT_QUEUE, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
String msgId = UUIDGen.getUUID();
BytesMessage message = session.createBytesMessage();
message.writeBytes(data);
message.setStringProperty(ISapProducer.IDOC_TYPE, type);
message.setStringProperty(ISapProducer.ORIGIN_FILE_NAME, name);
message.setStringProperty(ISapProducer.MESSAGE_ID, msgId);
message.setStringProperty(ISapProducer.ARCHIVE_PATH, archivePath);
message.setJMSReplyTo(session.createTemporaryQueue());
message.setJMSCorrelationID(msgId);
return message;
}
});
}
在这一步之后,我想该消息已经放入队列中。我有 @JmsListener 方法,'listen to' 这个队列:
@Override
@JmsListener(destination = "myqueue.export")
public void consume(final Message message) throws ServerException {
// some logic here
final HttpStatus httpStatus = client.send(gzip, idocType, documentFileName, messageId, archivePath);
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message responseMsg = session.createTextMessage(httpStatus.toString());
responseMsg.setJMSCorrelationID(message.getJMSCorrelationID());
return responseMsg;
}
});
// some logic here
}
在这里,我向远程系统发送 http 请求,并尝试在 sendAndReceive() 方法的响应消息中使用 httpStatus。但在 responseMessage 中始终为 null。而且看起来它是异步工作的。
我该如何解决?
适合我...
@SpringBootApplication
public class So53506177Application {
public static void main(String[] args) {
SpringApplication.run(So53506177Application.class, args);
}
private final SimpleMessageConverter converter = new SimpleMessageConverter();
@Bean
public ApplicationRunner runner(JmsTemplate jmsTemplate) {
return args -> {
jmsTemplate.setReceiveTimeout(20000);
Message received = jmsTemplate.sendAndReceive("foo", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
String msgId = "foo";
TextMessage message = session.createTextMessage("foo");
message.setJMSCorrelationID(msgId);
return message;
}
});
System.out.println("Reply: " + this.converter.fromMessage(received));
};
}
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "foo")
public void consume(final Message message) throws Exception {
System.out.println("Received: " + this.converter.fromMessage(message));
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message responseMsg = session.createTextMessage("bar");
responseMsg.setJMSCorrelationID(message.getJMSCorrelationID());
return responseMsg;
}
});
}
}
和
Received: foo
Reply: bar
但是,虽然它不会影响结果,但您不应创建自己的 replyTo
- 模板会在 MessageCreator
退出(并从中使用)后创建自己的。完成后它也会删除它:
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
你也可以简化你的听众:
@JmsListener(destination = "foo")
public String consume(final Message message) throws Exception {
System.out.println("Received: " + this.converter.fromMessage(message));
return "bar";
}