ActiveMQ Artemis 和 5.x 侦听器同时存在 - NullPointerException
ActiveMQ Artemis and 5.x listeners at same time - NullPointerException
我有一个遗留 Spring 4.2.1.RELEASE 应用程序作为侦听器连接到 ActiveMQ 5.x,现在我们正在添加到 ActiveMQ Artemis 的连接。对于 Artemis,我们使用持久订阅是因为我们不希望订阅者宕机时主题上的消息丢失,而共享订阅是因为我们希望选择集群或使用并发来异步处理订阅中的消息。我有单独的 ConnectionFactory
s 和 ListenerContainer
s,但是从这个不断重复的 WARN
日志看来,由于以下 NPE,Artemis DMLC 无法启动:
java.lang.NullPointerException
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
at java.lang.Thread.run(Unknown Source)
表面上看好像找不到方法createSharedDurableConsumer
。查看我的 AbstractMessageListenerContainer,第 856 行正在调用 method.invoke
/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
...
Method method = (isSubscriptionDurable() ?
createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
Artemis 配置:
@Configuration
public class ArtemisConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory artemisConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
.createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());
artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
artemisConnectionFactory
.setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
artemisConnectionFactory.setInitialConnectAttempts(
env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
artemisConnectionFactory
.setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
artemisConnectionFactory
.setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
artemisConnectionFactory.setBlockOnAcknowledge(true);
artemisConnectionFactory.setBlockOnDurableSend(true);
artemisConnectionFactory.setCacheDestinations(true);
artemisConnectionFactory.setConsumerWindowSize(0);
artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);
cachingConnectionFactory
.setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
JmsTransactionManager artemisJmsTransactionManager,
MappingJackson2MessageConverter mappingJackson2MessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConnectionFactory(artemisConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setMessageConverter(mappingJackson2MessageConverter);
factory.setSubscriptionDurable(Boolean.TRUE);
factory.setSubscriptionShared(Boolean.TRUE);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setSessionTransacted(Boolean.TRUE);
factory.setTransactionManager(artemisJmsTransactionManager);
return factory;
}
private TransportConfiguration[] createTransportConfigurations() {
String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);
primaryTransportParameters.put("host", primaryHostname);
primaryTransportParameters.put("port", primaryPort);
return new TransportConfiguration[] {
new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
}
}
我的 pom 使用 2.10.0 版的 Artemis。
我该如何解决这个问题?
JMS 2.0 规范向后兼容 JMS 1.1,因此请确保您的类路径中只有 JMS 2 规范。我的预感是 Spring 代码中的反射调用变得混乱,因为它们符合 JMS 1.1 规范 类 而不是正确的 JMS 2 规范 类.
我有一个遗留 Spring 4.2.1.RELEASE 应用程序作为侦听器连接到 ActiveMQ 5.x,现在我们正在添加到 ActiveMQ Artemis 的连接。对于 Artemis,我们使用持久订阅是因为我们不希望订阅者宕机时主题上的消息丢失,而共享订阅是因为我们希望选择集群或使用并发来异步处理订阅中的消息。我有单独的 ConnectionFactory
s 和 ListenerContainer
s,但是从这个不断重复的 WARN
日志看来,由于以下 NPE,Artemis DMLC 无法启动:
java.lang.NullPointerException
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
at java.lang.Thread.run(Unknown Source)
表面上看好像找不到方法createSharedDurableConsumer
。查看我的 AbstractMessageListenerContainer,第 856 行正在调用 method.invoke
/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
...
Method method = (isSubscriptionDurable() ?
createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
Artemis 配置:
@Configuration
public class ArtemisConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory artemisConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
.createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());
artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
artemisConnectionFactory
.setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
artemisConnectionFactory.setInitialConnectAttempts(
env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
artemisConnectionFactory
.setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
artemisConnectionFactory
.setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
artemisConnectionFactory.setBlockOnAcknowledge(true);
artemisConnectionFactory.setBlockOnDurableSend(true);
artemisConnectionFactory.setCacheDestinations(true);
artemisConnectionFactory.setConsumerWindowSize(0);
artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);
cachingConnectionFactory
.setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
JmsTransactionManager artemisJmsTransactionManager,
MappingJackson2MessageConverter mappingJackson2MessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConnectionFactory(artemisConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setMessageConverter(mappingJackson2MessageConverter);
factory.setSubscriptionDurable(Boolean.TRUE);
factory.setSubscriptionShared(Boolean.TRUE);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setSessionTransacted(Boolean.TRUE);
factory.setTransactionManager(artemisJmsTransactionManager);
return factory;
}
private TransportConfiguration[] createTransportConfigurations() {
String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);
primaryTransportParameters.put("host", primaryHostname);
primaryTransportParameters.put("port", primaryPort);
return new TransportConfiguration[] {
new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
}
}
我的 pom 使用 2.10.0 版的 Artemis。
我该如何解决这个问题?
JMS 2.0 规范向后兼容 JMS 1.1,因此请确保您的类路径中只有 JMS 2 规范。我的预感是 Spring 代码中的反射调用变得混乱,因为它们符合 JMS 1.1 规范 类 而不是正确的 JMS 2 规范 类.