带有 Spring JmsMessagingTemplate 的自定义 MessageConverter 未按预期工作

Custom MessageConverter with Spring JmsMessagingTemplate is not working as I expected

我正在尝试将实现 org.springframework.jms.support.converter.MessageConverter 的自定义消息转换器附加到 JmsMessagingTemplate

我在某处读到,我们可以通过调用 setPayloadConverter 将消息转换器附加到 MessagingMessageConverter,然后通过 [=19] 将该消息转换器附加到 JmsMessagingTemplate =].之后,我调用 convertAndSend,但我注意到它没有转换负载。

当我调试代码时,我注意到设置 Jms 消息转换器不会设置 JmsMessagingTemplate 中的 converter 实例变量。因此,当 convertAndSend 方法调用 doConvert 并尝试 getConverter 时,它正在获取默认的简单消息转换器,而不是我自定义的。

我的问题是,我可以将 org.springframework.jms.support.converter.MessageConverter 的实现与 JmsMessagingTemplate 一起使用吗?还是我需要使用 org.springframework.messaging.converter.MessageConverter 的实现?

我正在使用 Spring Boot 1.4.1.RELEASE 和 Spring 4.3.3.RELEASE。代码如下。

配置

@Configuration
@EnableJms
public class MessagingEncryptionPocConfig {
    /**
     * Listener ActiveMQ Connection Factory
     */
    @Bean(name="listenerActiveMqConnectionFactory")
    public ActiveMQConnectionFactory listenerActiveMqConnectionFactory() {
        return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
    }

    /**
     * Producer ActiveMQ Connection Factory
     */
    @Bean(name="producerActiveMqConnectionFactory")
    public ActiveMQConnectionFactory producerActiveMqConnectionFactory() {
        return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
    }   

    /**
     * Caching Connection Factory
     */
    @Bean
    public CachingConnectionFactory cachingConnectionFactory(@Qualifier("producerActiveMqConnectionFactory") ActiveMQConnectionFactory activeMqConnectionFactory) {
        return new CachingConnectionFactory(activeMqConnectionFactory);
    }

    /**
     * JMS Listener Container Factory
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Qualifier("listenerActiveMqConnectionFactory") ActiveMQConnectionFactory connectionFactory, MessagingMessageConverter messageConverter) {
        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
        defaultJmsListenerContainerFactory.setMessageConverter(messageConverter);
        return defaultJmsListenerContainerFactory;
    }

    /**
     * Jms Queue Template
     */
    @Bean(name="queueTemplate")
    public JmsMessagingTemplate queueTemplate(CachingConnectionFactory cachingConnectionFactory, MessageConverter messagingMessageConverter) {
        JmsMessagingTemplate queueTemplate = new JmsMessagingTemplate(cachingConnectionFactory);
        queueTemplate.setJmsMessageConverter(messagingMessageConverter);
        return queueTemplate;
    }

    @Bean
    public MessageConverter encryptionDecryptionMessagingConverter(Jaxb2Marshaller jaxb2Marshaller) {
        MessageConverter encryptionDecryptionMessagingConverter = new EncryptionDecryptionMessagingConverter(jaxb2Marshaller);
        MessagingMessageConverter messageConverter = new MessagingMessageConverter();
        messageConverter.setPayloadConverter(encryptionDecryptionMessagingConverter);
        return messageConverter;
    }

    /**
     * Jaxb marshaller
     */
    @Bean(name="producerJaxb2Marshaller")
    public Jaxb2Marshaller jaxb2Marshaller() {
        Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller();
        jaxb2Marshaller.setPackagesToScan("com.schema");
        return jaxb2Marshaller;
    }
}

MessageProducer Class

@Component 
public class MessageProducer {

    private static final Logger LOG = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    @Qualifier("queueTemplate")
    private JmsMessagingTemplate queueTemplate;

    public void publishMsg(Transaction trx, Map<String,Object> jmsHeaders, MessagePostProcessor postProcessor) {
        LOG.info("Sending Message. Payload={} Headers={}",trx,jmsHeaders);
        queueTemplate.convertAndSend("queue.source", trx, jmsHeaders, postProcessor);
    }
}

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class WebsMessagingEncryptionPocApplicationTests {
    @Autowired
    private MessageProducer producer;

    @Autowired
    private MessageListener messageListener;    

    /**
     * Ensure that a message is sent, and received.
     */
    @Test
    public void testProducer() throws Exception{
        //ARRANGE
        CountDownLatch latch = new CountDownLatch(1);
        messageListener.setCountDownLatch(latch);
        Transaction trx = new Transaction();
        trx.setCustomerAccountID(new BigInteger("111111"));
        Map<String,Object> jmsHeaders = new HashMap<String,Object>();
        jmsHeaders.put("tid", "1234563423");
        MessagePostProcessor encryptPostProcessor = new EncryptMessagePostProcessor();
        //ACT
        producer.publishMsg(trx, jmsHeaders, encryptPostProcessor);
        latch.await();
        //ASSERT - assertion done in the consumer       
    }
}

converter 字段用于将您的输入参数转换为 spring 消息 Message<?>

稍后(在 MessagingMessageCreator 中)使用 JMS 转换器从消息 Message<?>.

创建 JMS Message