spring启动配置多个ActiveMQ实例
spring boot configure multiple ActiveMQ instances
我需要将消息从一个 ActiveMQ 实例上的队列移动到另一个 ActiveMQ 实例。有没有办法使用 spring 引导配置连接到两个不同的 ActiveMQ 实例?
我需要创建多个connectionFactories吗?如果是这样,那么 JmsTemplate 如何知道要连接到哪个 ActiveMQ 实例?
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(JMS_BROKER_URL);
}
任何帮助和代码示例都会很有用。
提前致谢。
通用
您需要在您的应用程序中将多个 JmsTemplate
实例实例化为 Beans
,然后使用 @Qualifier
和 @Primary
注释的组合来指示哪个 JmsTemplate
实例应该去哪里。
例如
@Bean("queue1")
@Primary
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){
...
}
@Bean("queue2")
@Primary
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){
...
}
...
@Autowired
@Qualifier("queue1")
private JmsTemplate queue1;
...
有关详细信息,请参阅 here。
除了@Chris 的回复
您必须使用不同的端口创建不同的 BrokerService 实例,并创建不同的 ConnectionFactory 以连接到每个代理,并使用这些不同的工厂创建不同的 JmsTemplate 以将消息发送到不同的代理。
例如:
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
public static final String LOCAL_Q = "localQ";
public static final String REMOTE_Q = "remoteQ";
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
@Bean
public BrokerService broker2() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5672");
broker.setBrokerName("broker2");
broker.setUseJmx(false);
return broker;
}
@Bean
@Primary
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
return connectionFactory;
}
@Bean
public QueueConnectionFactory jmsConnectionFactory2() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
return connectionFactory;
}
@Bean
@Primary
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory());
jmsTemplate.setDefaultDestinationName(LOCAL_Q);
return jmsTemplate;
}
@Bean
public JmsTemplate jmsTemplate2() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
jmsTemplate.setDefaultDestinationName(REMOTE_Q);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
@Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
要将消息从一个 AMQ 实例移动到另一个实例,您可以使用 JmsBridgeConnectors
:
请注意,在下面的示例中,您不能在要从中转发消息的队列中有多个消费者,因为 Camel 或 JmsBridgeConnectors 会使用消息并转发它。如果您希望转发邮件的唯一副本,您有一些解决方案:
1- 将您的队列转换为主题,通过持久订阅或追溯消费者管理离线消费者的消息。
2- 将您的队列转换为复合队列并使用 DestinationsInterceptors 将消息复制到另一个队列。
3- 将 NetworkConnector 用于 Networkof brokers
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
OutboundQueueBridge bridge = new OutboundQueueBridge();
bridge.setLocalQueueName(LOCAL_Q);
bridge.setOutboundQueueName(REMOTE_Q);
OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
broker.setJmsBridgeConnectors(jmsConnectors);
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
或者像下面这样使用 Camel :
@Bean
public CamelContext camelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
context.addRoutes(new RouteBuilder() {
public void configure() {
from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
}
});
context.start();
return context;
}
您的生产者必须像这样才能使用不同的 JmsTemplates :
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer implements CommandLineRunner {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("jmsTemplate2")
private JmsTemplate jmsTemplate2;
@Override
public void run(String... args) throws Exception {
send("Sample message");
}
public void send(String msg) {
this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
}
}
和消费者:
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
public void receiveQueue(Session session, String text) {
System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
System.out.println(text);
}
}
您可以使用 Spring 队列消费者的默认引导
@JmsListener(destination = “queue.name")
public void consumer(String message) {
// consume the message
}
对于生产者,您可以创建另一个 JmsTemplate @Bean
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(new ActiveMQConnectionFactory("tcp://localhost:5671"));
}
通过这种方式,您可以动态注册任意数量的 brokers/listeners:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.JmsListenerConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
import javax.jms.Message;
import javax.jms.MessageListener;
@Configuration
public class CustomJmsConfigurer implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory();
amqConnectionFactory.setBrokerURL("brokerUrl");
amqConnectionFactory.setUserName("user");
amqConnectionFactory.setPassword("password");
amqConnectionFactory.setExclusiveConsumer(true);
DefaultJmsListenerContainerFactory containerFactory = new DefaultJmsListenerContainerFactory();
containerFactory.setConnectionFactory(amqConnectionFactory);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("someIdentifier");
endpoint.setDestination("queueName");
endpoint.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// Do your stuff
}
});
registrar.registerEndpoint(endpoint, containerFactory);
}
}
我需要将消息从一个 ActiveMQ 实例上的队列移动到另一个 ActiveMQ 实例。有没有办法使用 spring 引导配置连接到两个不同的 ActiveMQ 实例?
我需要创建多个connectionFactories吗?如果是这样,那么 JmsTemplate 如何知道要连接到哪个 ActiveMQ 实例?
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(JMS_BROKER_URL);
}
任何帮助和代码示例都会很有用。
提前致谢。 通用
您需要在您的应用程序中将多个 JmsTemplate
实例实例化为 Beans
,然后使用 @Qualifier
和 @Primary
注释的组合来指示哪个 JmsTemplate
实例应该去哪里。
例如
@Bean("queue1")
@Primary
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){
...
}
@Bean("queue2")
@Primary
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){
...
}
...
@Autowired
@Qualifier("queue1")
private JmsTemplate queue1;
...
有关详细信息,请参阅 here。
除了@Chris 的回复 您必须使用不同的端口创建不同的 BrokerService 实例,并创建不同的 ConnectionFactory 以连接到每个代理,并使用这些不同的工厂创建不同的 JmsTemplate 以将消息发送到不同的代理。
例如:
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
public static final String LOCAL_Q = "localQ";
public static final String REMOTE_Q = "remoteQ";
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
@Bean
public BrokerService broker2() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5672");
broker.setBrokerName("broker2");
broker.setUseJmx(false);
return broker;
}
@Bean
@Primary
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
return connectionFactory;
}
@Bean
public QueueConnectionFactory jmsConnectionFactory2() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
return connectionFactory;
}
@Bean
@Primary
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory());
jmsTemplate.setDefaultDestinationName(LOCAL_Q);
return jmsTemplate;
}
@Bean
public JmsTemplate jmsTemplate2() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
jmsTemplate.setDefaultDestinationName(REMOTE_Q);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
@Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
要将消息从一个 AMQ 实例移动到另一个实例,您可以使用 JmsBridgeConnectors
:
请注意,在下面的示例中,您不能在要从中转发消息的队列中有多个消费者,因为 Camel 或 JmsBridgeConnectors 会使用消息并转发它。如果您希望转发邮件的唯一副本,您有一些解决方案: 1- 将您的队列转换为主题,通过持久订阅或追溯消费者管理离线消费者的消息。 2- 将您的队列转换为复合队列并使用 DestinationsInterceptors 将消息复制到另一个队列。 3- 将 NetworkConnector 用于 Networkof brokers
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
OutboundQueueBridge bridge = new OutboundQueueBridge();
bridge.setLocalQueueName(LOCAL_Q);
bridge.setOutboundQueueName(REMOTE_Q);
OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
broker.setJmsBridgeConnectors(jmsConnectors);
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
或者像下面这样使用 Camel :
@Bean
public CamelContext camelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
context.addRoutes(new RouteBuilder() {
public void configure() {
from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
}
});
context.start();
return context;
}
您的生产者必须像这样才能使用不同的 JmsTemplates :
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer implements CommandLineRunner {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("jmsTemplate2")
private JmsTemplate jmsTemplate2;
@Override
public void run(String... args) throws Exception {
send("Sample message");
}
public void send(String msg) {
this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
}
}
和消费者:
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
public void receiveQueue(Session session, String text) {
System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
System.out.println(text);
}
}
您可以使用 Spring 队列消费者的默认引导
@JmsListener(destination = “queue.name")
public void consumer(String message) {
// consume the message
}
对于生产者,您可以创建另一个 JmsTemplate @Bean
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(new ActiveMQConnectionFactory("tcp://localhost:5671"));
}
通过这种方式,您可以动态注册任意数量的 brokers/listeners:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.JmsListenerConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
import javax.jms.Message;
import javax.jms.MessageListener;
@Configuration
public class CustomJmsConfigurer implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory();
amqConnectionFactory.setBrokerURL("brokerUrl");
amqConnectionFactory.setUserName("user");
amqConnectionFactory.setPassword("password");
amqConnectionFactory.setExclusiveConsumer(true);
DefaultJmsListenerContainerFactory containerFactory = new DefaultJmsListenerContainerFactory();
containerFactory.setConnectionFactory(amqConnectionFactory);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("someIdentifier");
endpoint.setDestination("queueName");
endpoint.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// Do your stuff
}
});
registrar.registerEndpoint(endpoint, containerFactory);
}
}