当应用程序服务器突然停止时,使用 Spring JMS 和 ActiveMQ 丢失 JMS 消息

Losing JMS Messages with Spring JMS and ActiveMQ when application server is suddenly stopped

我有一个 Spring JMS 应用程序,它有一个在应用程序启动时连接到活动 MQ queue 的 JMS 侦听器。这个 JMS 侦听器是应用程序的一部分,它接收一条消息,用内容丰富它,然后将它传递到同一个 ActiveMQ 代理上的主题。

sessionTransacted 设置为 True。我没有执行任何数据库事务,所以我没有在任何地方设置@Transactional。根据我的阅读,sessionTransacted 属性 围绕 JMS 侦听器的接收方法设置本地事务,因此在事务完成之前它不会从 queue 中提取消息。我已经使用本地 ActiveMQ 实例和我的本地 tomcat 容器对此进行了测试,它按预期工作。

但是,当我部署到我们的 PERF 环境并重试相同的测试时,我注意到服务器关闭时当前 in-flight 的消息在完成之前从 queue 中提取接收方法。

我想知道是否有任何明显的我应该寻找的东西?是否有某些 JMS headers 会导致此行为发生?如果我可以提供更多信息,请告诉我。

我在 Tomcat 7 容器 运行 Java 8.

更新 - 添加我的 Java JMS 配置。请注意,为了清楚起见,我将 PERF 属性文件中的内容替换为相关区域。

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(-1);
    factory.setConcurrency(1);
    factory.setSessionTransacted(Boolean.TRUE);
    return factory;
}

@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

    redeliveryPolicy.setInitialRedeliveryDelay(1000);
    redeliveryPolicy.setRedeliveryDelay(1000);
    redeliveryPolicy.setMaximumRedeliveries(6);
    redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
    redeliveryPolicy.setBackOffMultiplier(5);

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
    cachingConnectionFactory.setSessionCacheSize(1);
    return cachingConnectionFactory;
}

@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);

    return jmsMessagingTemplate;
}

protected ActiveMQPrefetchPolicy prefetchPolicy(){
    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    int prefetchValue = 0; 
    prefetchPolicy.setQueuePrefetch(prefetchValue);
    return prefetchPolicy;
}

谢谢,

胡安

事实证明,我们的 PERF 环境中部署了不同版本的应用程序。更新应用程序后,它会按预期工作。