当应用程序服务器突然停止时,使用 Spring JMS 和 ActiveMQ 丢失 JMS 消息
Losing JMS Messages with Spring JMS and ActiveMQ when application server is suddenly stopped
我有一个 Spring JMS 应用程序,它有一个在应用程序启动时连接到活动 MQ queue 的 JMS 侦听器。这个 JMS 侦听器是应用程序的一部分,它接收一条消息,用内容丰富它,然后将它传递到同一个 ActiveMQ 代理上的主题。
sessionTransacted 设置为 True。我没有执行任何数据库事务,所以我没有在任何地方设置@Transactional。根据我的阅读,sessionTransacted 属性 围绕 JMS 侦听器的接收方法设置本地事务,因此在事务完成之前它不会从 queue 中提取消息。我已经使用本地 ActiveMQ 实例和我的本地 tomcat 容器对此进行了测试,它按预期工作。
但是,当我部署到我们的 PERF 环境并重试相同的测试时,我注意到服务器关闭时当前 in-flight 的消息在完成之前从 queue 中提取接收方法。
我想知道是否有任何明显的我应该寻找的东西?是否有某些 JMS headers 会导致此行为发生?如果我可以提供更多信息,请告诉我。
我在 Tomcat 7 容器 运行 Java 8.
更新 - 添加我的 Java JMS 配置。请注意,为了清楚起见,我将 PERF 属性文件中的内容替换为相关区域。
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxMessagesPerTask(-1);
factory.setConcurrency(1);
factory.setSessionTransacted(Boolean.TRUE);
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setRedeliveryDelay(1000);
redeliveryPolicy.setMaximumRedeliveries(6);
redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
redeliveryPolicy.setBackOffMultiplier(5);
ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
activeMQ.setRedeliveryPolicy(redeliveryPolicy);
activeMQ.setPrefetchPolicy(prefetchPolicy());
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
cachingConnectionFactory.setSessionCacheSize(1);
return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
jmsMessagingTemplate.setDefaultDestination(activeMQ);
return jmsMessagingTemplate;
}
protected ActiveMQPrefetchPolicy prefetchPolicy(){
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
int prefetchValue = 0;
prefetchPolicy.setQueuePrefetch(prefetchValue);
return prefetchPolicy;
}
谢谢,
胡安
事实证明,我们的 PERF 环境中部署了不同版本的应用程序。更新应用程序后,它会按预期工作。
我有一个 Spring JMS 应用程序,它有一个在应用程序启动时连接到活动 MQ queue 的 JMS 侦听器。这个 JMS 侦听器是应用程序的一部分,它接收一条消息,用内容丰富它,然后将它传递到同一个 ActiveMQ 代理上的主题。
sessionTransacted 设置为 True。我没有执行任何数据库事务,所以我没有在任何地方设置@Transactional。根据我的阅读,sessionTransacted 属性 围绕 JMS 侦听器的接收方法设置本地事务,因此在事务完成之前它不会从 queue 中提取消息。我已经使用本地 ActiveMQ 实例和我的本地 tomcat 容器对此进行了测试,它按预期工作。
但是,当我部署到我们的 PERF 环境并重试相同的测试时,我注意到服务器关闭时当前 in-flight 的消息在完成之前从 queue 中提取接收方法。
我想知道是否有任何明显的我应该寻找的东西?是否有某些 JMS headers 会导致此行为发生?如果我可以提供更多信息,请告诉我。
我在 Tomcat 7 容器 运行 Java 8.
更新 - 添加我的 Java JMS 配置。请注意,为了清楚起见,我将 PERF 属性文件中的内容替换为相关区域。
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxMessagesPerTask(-1);
factory.setConcurrency(1);
factory.setSessionTransacted(Boolean.TRUE);
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setRedeliveryDelay(1000);
redeliveryPolicy.setMaximumRedeliveries(6);
redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
redeliveryPolicy.setBackOffMultiplier(5);
ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
activeMQ.setRedeliveryPolicy(redeliveryPolicy);
activeMQ.setPrefetchPolicy(prefetchPolicy());
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
cachingConnectionFactory.setSessionCacheSize(1);
return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
jmsMessagingTemplate.setDefaultDestination(activeMQ);
return jmsMessagingTemplate;
}
protected ActiveMQPrefetchPolicy prefetchPolicy(){
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
int prefetchValue = 0;
prefetchPolicy.setQueuePrefetch(prefetchValue);
return prefetchPolicy;
}
谢谢,
胡安
事实证明,我们的 PERF 环境中部署了不同版本的应用程序。更新应用程序后,它会按预期工作。