更新:Spring IBM MQ 系列上的引导 JMS 静态回复队列
Update: Spring Boot JMS static reply queue on IBM MQ Series
在我的用例中,我需要通过托管队列对远程系统进行请求-回复调用。使用 Spring Boot 和 IBM 的 MQ starter 我遇到了应用程序想要创建 dynamic/temporary 回复队列而不是使用已经存在的托管队列的问题。
配置在这里设置
@EnableJms
@Configuration
public class QueueConfiguration {
@Bean
public MQQueueConnectionFactory connectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setTransportType(CT_WMQ); // is 1
factory.setHostName(queueProperties.getHost());
factory.setPort(queueProperties.getPort());
factory.setChannel(queueProperties.getChannel()); // combo of ${queueManager}%${channel}
return factory;
}
@Bean
public JmsMessagingTemplate messagingTemplate(ConnectionFactory connectionFactory) {
JmsMessagingTemplate jmt = new JmsMessagingTemplate(connectionFactory);
jmt.setDefaultDestinationName(queueProperties.getQueueName());
return jmt;
}
@Bean
public Jaxb2Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan("com.foo.model");
return marshaller;
}
@Bean
public MessageConverter messageConverter(Jaxb2Marshaller marshaller) {
MarshallingMessageConverter converter = new MarshallingMessageConverter();
converter.setMarshaller(marshaller);
converter.setUnmarshaller(marshaller);
return converter;
}
}
用法非常简单:将对象转换并发送。等待响应接收
并转换它。
@Component
public class ExampleSenderReceiver {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Override
@SneakyThrows
public ResponseExample sendAndReceive(RequestExample request, String correlationId) {
MessagePostProcessor mpp = message -> {
message = MessageBuilder.fromMessage(message)
.setHeader(JmsHeaders.CORRELATION_ID, correlationId)
// .setHeader(JmsHeaders.REPLY_TO, "DEV.QUEUE.3") this triggers queue creation
.build();
return message;
};
String destination = Objects.requireNonNull(jmsMessagingTemplate.getDefaultDestinationName());
return jmsMessagingTemplate.convertSendAndReceive(destination, request, ResponseExample.class, mpp);
}
我已经阅读了很多 IBM 文档并认为,我需要将 message type 设置为“MQMT_REQUEST”,但我没有找到合适的位置。
更新
添加了 Spring 集成为 并添加了 JmsOutboundGateway 的配置
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel responseChannel() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = "requestChannel" )
public JmsOutboundGateway jmsOutboundGateway( ConnectionFactory connectionFactory) {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestDestinationName("REQUEST");
gateway.setReplyDestinationName("RESPONSE");
gateway.setReplyChannel(responseChannel());
gateway.setCorrelationKey("JMSCorrelationID*");
gateway.setIdleReplyContainerTimeout(2, TimeUnit.SECONDS);
return gateway;
}
并调整了我的 ExampleSenderReceiver class
@Autowired
@Qualifier("requestChannel")
private MessageChannel requestChannel;
@Autowired
@Qualifier("responseChannel")
private QueueChannel responseChannel;
@Override
@SneakyThrows
public ResponseExample sendAndReceive(RequestExample request, String correlationId) {
String xmlContent = "the marshalled request object";
Map<String, Object> header = new HashMap<>();
header.put(JmsHeaders.CORRELATION_ID, correlationId);
GenericMessage<String> message1 = new GenericMessage<>(xmlContent, header);
requestChannel.send(message1);
log.info("send done" );
Message<?> receive = responseChannel.receive(1500);
if(null != receive){
log.info("incoming: {}", receive.toString());
}
}
重要的部分是gateway.setCorrelationKey("JMSCorrelationID*");
没有该行,correlationId 传播不正确。
下一步是重新添加 MessageConverters 并使其再次美观。
谢谢。
您缺少队列管理器。
ibm:
mq:
queueManager: QM1
channel: chanel
connName: localhost(1414)
user: admin
password: admin
默认的 JmsTemplate(由 JmsMessagingTemplate
使用)始终使用临时回复队列。您可以将其子类化并覆盖 doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
以改为使用您的托管队列。
但是,它只有在您一次有一个未完成的请求时才有效(例如,所有 运行 在一个线程上)。您还必须通过检查相关 ID 添加代码以丢弃“迟到”到达。
您可以改用异步发送并处理侦听器容器上的回复并将回复与请求相关联。
考虑改用 spring-integration-jms
及其出站网关 - 它在回复队列处理方面具有更大的灵活性(并为您完成所有关联)。
https://docs.spring.io/spring-integration/reference/html/jms.html#jms-outbound-gateway
在我的用例中,我需要通过托管队列对远程系统进行请求-回复调用。使用 Spring Boot 和 IBM 的 MQ starter 我遇到了应用程序想要创建 dynamic/temporary 回复队列而不是使用已经存在的托管队列的问题。
配置在这里设置
@EnableJms
@Configuration
public class QueueConfiguration {
@Bean
public MQQueueConnectionFactory connectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setTransportType(CT_WMQ); // is 1
factory.setHostName(queueProperties.getHost());
factory.setPort(queueProperties.getPort());
factory.setChannel(queueProperties.getChannel()); // combo of ${queueManager}%${channel}
return factory;
}
@Bean
public JmsMessagingTemplate messagingTemplate(ConnectionFactory connectionFactory) {
JmsMessagingTemplate jmt = new JmsMessagingTemplate(connectionFactory);
jmt.setDefaultDestinationName(queueProperties.getQueueName());
return jmt;
}
@Bean
public Jaxb2Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan("com.foo.model");
return marshaller;
}
@Bean
public MessageConverter messageConverter(Jaxb2Marshaller marshaller) {
MarshallingMessageConverter converter = new MarshallingMessageConverter();
converter.setMarshaller(marshaller);
converter.setUnmarshaller(marshaller);
return converter;
}
}
用法非常简单:将对象转换并发送。等待响应接收 并转换它。
@Component
public class ExampleSenderReceiver {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Override
@SneakyThrows
public ResponseExample sendAndReceive(RequestExample request, String correlationId) {
MessagePostProcessor mpp = message -> {
message = MessageBuilder.fromMessage(message)
.setHeader(JmsHeaders.CORRELATION_ID, correlationId)
// .setHeader(JmsHeaders.REPLY_TO, "DEV.QUEUE.3") this triggers queue creation
.build();
return message;
};
String destination = Objects.requireNonNull(jmsMessagingTemplate.getDefaultDestinationName());
return jmsMessagingTemplate.convertSendAndReceive(destination, request, ResponseExample.class, mpp);
}
我已经阅读了很多 IBM 文档并认为,我需要将 message type 设置为“MQMT_REQUEST”,但我没有找到合适的位置。
更新
添加了 Spring 集成为
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel responseChannel() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = "requestChannel" )
public JmsOutboundGateway jmsOutboundGateway( ConnectionFactory connectionFactory) {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestDestinationName("REQUEST");
gateway.setReplyDestinationName("RESPONSE");
gateway.setReplyChannel(responseChannel());
gateway.setCorrelationKey("JMSCorrelationID*");
gateway.setIdleReplyContainerTimeout(2, TimeUnit.SECONDS);
return gateway;
}
并调整了我的 ExampleSenderReceiver class
@Autowired
@Qualifier("requestChannel")
private MessageChannel requestChannel;
@Autowired
@Qualifier("responseChannel")
private QueueChannel responseChannel;
@Override
@SneakyThrows
public ResponseExample sendAndReceive(RequestExample request, String correlationId) {
String xmlContent = "the marshalled request object";
Map<String, Object> header = new HashMap<>();
header.put(JmsHeaders.CORRELATION_ID, correlationId);
GenericMessage<String> message1 = new GenericMessage<>(xmlContent, header);
requestChannel.send(message1);
log.info("send done" );
Message<?> receive = responseChannel.receive(1500);
if(null != receive){
log.info("incoming: {}", receive.toString());
}
}
重要的部分是gateway.setCorrelationKey("JMSCorrelationID*");
没有该行,correlationId 传播不正确。
下一步是重新添加 MessageConverters 并使其再次美观。
谢谢。
您缺少队列管理器。
ibm:
mq:
queueManager: QM1
channel: chanel
connName: localhost(1414)
user: admin
password: admin
默认的 JmsTemplate(由 JmsMessagingTemplate
使用)始终使用临时回复队列。您可以将其子类化并覆盖 doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
以改为使用您的托管队列。
但是,它只有在您一次有一个未完成的请求时才有效(例如,所有 运行 在一个线程上)。您还必须通过检查相关 ID 添加代码以丢弃“迟到”到达。
您可以改用异步发送并处理侦听器容器上的回复并将回复与请求相关联。
考虑改用 spring-integration-jms
及其出站网关 - 它在回复队列处理方面具有更大的灵活性(并为您完成所有关联)。
https://docs.spring.io/spring-integration/reference/html/jms.html#jms-outbound-gateway