带有 Spring JmsMessagingTemplate 的自定义 MessageConverter 未按预期工作
Custom MessageConverter with Spring JmsMessagingTemplate is not working as I expected
我正在尝试将实现 org.springframework.jms.support.converter.MessageConverter
的自定义消息转换器附加到 JmsMessagingTemplate
。
我在某处读到,我们可以通过调用 setPayloadConverter
将消息转换器附加到 MessagingMessageConverter
,然后通过 [=19] 将该消息转换器附加到 JmsMessagingTemplate
=].之后,我调用 convertAndSend
,但我注意到它没有转换负载。
当我调试代码时,我注意到设置 Jms 消息转换器不会设置 JmsMessagingTemplate
中的 converter
实例变量。因此,当 convertAndSend
方法调用 doConvert
并尝试 getConverter
时,它正在获取默认的简单消息转换器,而不是我自定义的。
我的问题是,我可以将 org.springframework.jms.support.converter.MessageConverter
的实现与 JmsMessagingTemplate
一起使用吗?还是我需要使用 org.springframework.messaging.converter.MessageConverter
的实现?
我正在使用 Spring Boot 1.4.1.RELEASE 和 Spring 4.3.3.RELEASE。代码如下。
配置
@Configuration
@EnableJms
public class MessagingEncryptionPocConfig {
/**
* Listener ActiveMQ Connection Factory
*/
@Bean(name="listenerActiveMqConnectionFactory")
public ActiveMQConnectionFactory listenerActiveMqConnectionFactory() {
return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
}
/**
* Producer ActiveMQ Connection Factory
*/
@Bean(name="producerActiveMqConnectionFactory")
public ActiveMQConnectionFactory producerActiveMqConnectionFactory() {
return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
}
/**
* Caching Connection Factory
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory(@Qualifier("producerActiveMqConnectionFactory") ActiveMQConnectionFactory activeMqConnectionFactory) {
return new CachingConnectionFactory(activeMqConnectionFactory);
}
/**
* JMS Listener Container Factory
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Qualifier("listenerActiveMqConnectionFactory") ActiveMQConnectionFactory connectionFactory, MessagingMessageConverter messageConverter) {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
defaultJmsListenerContainerFactory.setMessageConverter(messageConverter);
return defaultJmsListenerContainerFactory;
}
/**
* Jms Queue Template
*/
@Bean(name="queueTemplate")
public JmsMessagingTemplate queueTemplate(CachingConnectionFactory cachingConnectionFactory, MessageConverter messagingMessageConverter) {
JmsMessagingTemplate queueTemplate = new JmsMessagingTemplate(cachingConnectionFactory);
queueTemplate.setJmsMessageConverter(messagingMessageConverter);
return queueTemplate;
}
@Bean
public MessageConverter encryptionDecryptionMessagingConverter(Jaxb2Marshaller jaxb2Marshaller) {
MessageConverter encryptionDecryptionMessagingConverter = new EncryptionDecryptionMessagingConverter(jaxb2Marshaller);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setPayloadConverter(encryptionDecryptionMessagingConverter);
return messageConverter;
}
/**
* Jaxb marshaller
*/
@Bean(name="producerJaxb2Marshaller")
public Jaxb2Marshaller jaxb2Marshaller() {
Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller();
jaxb2Marshaller.setPackagesToScan("com.schema");
return jaxb2Marshaller;
}
}
MessageProducer Class
@Component
public class MessageProducer {
private static final Logger LOG = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
@Qualifier("queueTemplate")
private JmsMessagingTemplate queueTemplate;
public void publishMsg(Transaction trx, Map<String,Object> jmsHeaders, MessagePostProcessor postProcessor) {
LOG.info("Sending Message. Payload={} Headers={}",trx,jmsHeaders);
queueTemplate.convertAndSend("queue.source", trx, jmsHeaders, postProcessor);
}
}
单元测试
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class WebsMessagingEncryptionPocApplicationTests {
@Autowired
private MessageProducer producer;
@Autowired
private MessageListener messageListener;
/**
* Ensure that a message is sent, and received.
*/
@Test
public void testProducer() throws Exception{
//ARRANGE
CountDownLatch latch = new CountDownLatch(1);
messageListener.setCountDownLatch(latch);
Transaction trx = new Transaction();
trx.setCustomerAccountID(new BigInteger("111111"));
Map<String,Object> jmsHeaders = new HashMap<String,Object>();
jmsHeaders.put("tid", "1234563423");
MessagePostProcessor encryptPostProcessor = new EncryptMessagePostProcessor();
//ACT
producer.publishMsg(trx, jmsHeaders, encryptPostProcessor);
latch.await();
//ASSERT - assertion done in the consumer
}
}
converter
字段用于将您的输入参数转换为 spring 消息 Message<?>
。
稍后(在 MessagingMessageCreator
中)使用 JMS 转换器从消息 Message<?>
.
创建 JMS Message
我正在尝试将实现 org.springframework.jms.support.converter.MessageConverter
的自定义消息转换器附加到 JmsMessagingTemplate
。
我在某处读到,我们可以通过调用 setPayloadConverter
将消息转换器附加到 MessagingMessageConverter
,然后通过 [=19] 将该消息转换器附加到 JmsMessagingTemplate
=].之后,我调用 convertAndSend
,但我注意到它没有转换负载。
当我调试代码时,我注意到设置 Jms 消息转换器不会设置 JmsMessagingTemplate
中的 converter
实例变量。因此,当 convertAndSend
方法调用 doConvert
并尝试 getConverter
时,它正在获取默认的简单消息转换器,而不是我自定义的。
我的问题是,我可以将 org.springframework.jms.support.converter.MessageConverter
的实现与 JmsMessagingTemplate
一起使用吗?还是我需要使用 org.springframework.messaging.converter.MessageConverter
的实现?
我正在使用 Spring Boot 1.4.1.RELEASE 和 Spring 4.3.3.RELEASE。代码如下。
配置
@Configuration
@EnableJms
public class MessagingEncryptionPocConfig {
/**
* Listener ActiveMQ Connection Factory
*/
@Bean(name="listenerActiveMqConnectionFactory")
public ActiveMQConnectionFactory listenerActiveMqConnectionFactory() {
return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
}
/**
* Producer ActiveMQ Connection Factory
*/
@Bean(name="producerActiveMqConnectionFactory")
public ActiveMQConnectionFactory producerActiveMqConnectionFactory() {
return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
}
/**
* Caching Connection Factory
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory(@Qualifier("producerActiveMqConnectionFactory") ActiveMQConnectionFactory activeMqConnectionFactory) {
return new CachingConnectionFactory(activeMqConnectionFactory);
}
/**
* JMS Listener Container Factory
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Qualifier("listenerActiveMqConnectionFactory") ActiveMQConnectionFactory connectionFactory, MessagingMessageConverter messageConverter) {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
defaultJmsListenerContainerFactory.setMessageConverter(messageConverter);
return defaultJmsListenerContainerFactory;
}
/**
* Jms Queue Template
*/
@Bean(name="queueTemplate")
public JmsMessagingTemplate queueTemplate(CachingConnectionFactory cachingConnectionFactory, MessageConverter messagingMessageConverter) {
JmsMessagingTemplate queueTemplate = new JmsMessagingTemplate(cachingConnectionFactory);
queueTemplate.setJmsMessageConverter(messagingMessageConverter);
return queueTemplate;
}
@Bean
public MessageConverter encryptionDecryptionMessagingConverter(Jaxb2Marshaller jaxb2Marshaller) {
MessageConverter encryptionDecryptionMessagingConverter = new EncryptionDecryptionMessagingConverter(jaxb2Marshaller);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setPayloadConverter(encryptionDecryptionMessagingConverter);
return messageConverter;
}
/**
* Jaxb marshaller
*/
@Bean(name="producerJaxb2Marshaller")
public Jaxb2Marshaller jaxb2Marshaller() {
Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller();
jaxb2Marshaller.setPackagesToScan("com.schema");
return jaxb2Marshaller;
}
}
MessageProducer Class
@Component
public class MessageProducer {
private static final Logger LOG = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
@Qualifier("queueTemplate")
private JmsMessagingTemplate queueTemplate;
public void publishMsg(Transaction trx, Map<String,Object> jmsHeaders, MessagePostProcessor postProcessor) {
LOG.info("Sending Message. Payload={} Headers={}",trx,jmsHeaders);
queueTemplate.convertAndSend("queue.source", trx, jmsHeaders, postProcessor);
}
}
单元测试
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class WebsMessagingEncryptionPocApplicationTests {
@Autowired
private MessageProducer producer;
@Autowired
private MessageListener messageListener;
/**
* Ensure that a message is sent, and received.
*/
@Test
public void testProducer() throws Exception{
//ARRANGE
CountDownLatch latch = new CountDownLatch(1);
messageListener.setCountDownLatch(latch);
Transaction trx = new Transaction();
trx.setCustomerAccountID(new BigInteger("111111"));
Map<String,Object> jmsHeaders = new HashMap<String,Object>();
jmsHeaders.put("tid", "1234563423");
MessagePostProcessor encryptPostProcessor = new EncryptMessagePostProcessor();
//ACT
producer.publishMsg(trx, jmsHeaders, encryptPostProcessor);
latch.await();
//ASSERT - assertion done in the consumer
}
}
converter
字段用于将您的输入参数转换为 spring 消息 Message<?>
。
稍后(在 MessagingMessageCreator
中)使用 JMS 转换器从消息 Message<?>
.
Message