Spring 将 apache activemq 引导迁移到 artemis

Spring boot migration of apache activemq to artemis

我正在使用带 spring 启动的 apache activemq,我想迁移到 apache artemis 以提高集群和节点的使用率。

目前我主要使用 VirtualTopics 的概念和 JMS,例如

@JMSListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessage() {
    ...
}

...

public void send(JMSTemplate template) {
    template.convertAndSend("VirtualTopic.simple", "Hello world!");
}

我读过,artemis 将其地址模型更改为 地址、队列和路由类型 而不是 队列、主题和虚拟主题就像在 activemq 中一样。 我已经阅读了很多,但我认为我不正确,我现在如何迁移。我尝试了与上面相同的方法,所以我从 Maven 导入了 Artemis JMSClient 并想像以前一样使用它,但是使用 FQQN(完全限定队列名称)VirtualTopic-Wildcard 你可以阅读一些资源。但不知何故它不能正常工作。

我的问题是: - 如何迁移 VirtualTopics?我用 FQQN 和那些 VirtualTopics-Wildcards 做对了吗? - 如何为上面的代码示例指定路由类型任播和多播? (在在线示例中,地址和队列在服务器 broker.xml 中是硬编码的,但我想在应用程序运行时创建它。) - 如何将它与 openwire 协议一起使用,应用程序如何知道它使用什么?它只取决于我使用的 artemis 端口吗?那么 61616 用于 openwire?

谁能帮我理清思路?

更新:

还有一些问题。

1) 我总是读类似 "a default 5.x consumer" 的东西。预计它会与阿尔忒弥斯混在一起吗?就像您保留所有这些命名约定,只是将地址添加到 VirtualTopic 名称到 FQQN,然后将依赖项更改为 artemis?

2) 我已经用 "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;""import org.apache.activemq.ActiveMQConnectionFactory;" 尝试过 "virtualTopicConsumerWildcards",但仅在第二种情况下有所不同。

3) 我还尝试在接受器中仅使用 OpenWire 作为协议,但在这种情况下(以及使用 "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;")我在启动我的应用程序时遇到以下错误:“2020-03-30 11:41:19,504 ERROR [org.apache.activemq.artemis.core.server] AMQ224096: Error setting up connection from /127.0.0.1:54201 to /127.0.0.1:61616; protocol CORE not found in map: [OPENWIRE]”。

4) 我是否将 multicast:://VirtualTopic.simple 作为目的地名称放入 template.convertAndSend(...)? 我尝试 template.setPubSubDomain(true) 用于多播路由类型,并将其保留为任播,这有效。但这是一个好方法吗?

5) 你可能知道,我如何 "tell" 我的 spring-boot-application with template.convertAndSend(...); 使用 Openwire?

更新2: 共享持久订阅

@JmsListener(destination = "VirtualTopic.test", id = "c1", subscription = "Consumer.A.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive1(String m) {

}

@JmsListener(destination = "VirtualTopic.test", id = "c2", subscription = "Consumer.B.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive2(String m) {

}

@Bean
public DefaultJmsListenerContainerFactory queueConnectionFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setClientId("brokerClientId");
    factory.setSubscriptionDurable(true);
    factory.setSubscriptionShared(true);
    return factory;
}

错误:

2020-04-17 11:23:44.485  WARN 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'VirtualTopic.test' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer; 
2020-04-17 11:23:44.514 ERROR 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'VirtualTopic.test' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Broker: d1 - Client: brokerClientId already connected from /127.0.0.1:59979

我做错了什么?

虚拟主题背后的想法是生产者以通常的 JMS 方式发送到主题,而消费者可以从物理队列中消费逻辑主题订阅,允许许多消费者在许多机器上 运行 & 线程来负载平衡负载。

Artemis 在内部使用每个主题订阅者模型的队列,并且可以使用其 Fully Qualified Queue name (FQQN) 直接寻址订阅队列。

例如,主题 VirtualTopic.simple 订阅 A Consumer.A.VirtualTopic.simple 的默认 5.x 消费者目的地将替换为由地址和队列 VirtualTopic.simple::Consumer.A.VirtualTopic.simple 组成的 Artemis FQQN。

但是 Artemis 支持 virtual topic wildcard filter mechanism,它会自动将消费者目的地转换为相应的 FQQN。要启用过滤机制,配置字符串 属性 virtualTopicConsumerWildcards 可以使用。它有两个部分,由 ; 分隔,即默认 5.x 消费者前缀为 Consumer.*. 的虚拟主题,需要 virtualTopicConsumerWildcards 过滤器 Consumer.*.>;2

Artemis 默认配置为自动创建客户请求的目的地。他们可以在连接到地址时指定一个特殊的前缀,以指示使用哪种路由类型。可以通过向接受器添加配置字符串 属性 anycastPrefixmulticastPrefix 来启用它们,您可以在 Using Prefixes to Determine Routing Type 中找到更多详细信息。例如添加到接受器anycastPrefix=anycast://;multicastPrefix=multicast://,如果客户端只需要发送消息到ANYCAST队列之一应该使用目的地anycast:://VirtualTopic.simple,如果客户端需要发送消息到MULTICAST应该使用目的地 multicast:://VirtualTopic.simple.

Artemis acceptors 支持对所有协议使用单个端口,它们会自动检测正在使用的协议是 CORE、AMQP、STOMP 还是 OPENWIRE,但是可以通过使用协议参数。

以下接受器启用任播前缀 anycast://、多播前缀 multicast:// 和虚拟主题消费者通配符,在端点 localhost:61616.[=30 上禁用除 OPENWIRE 之外的所有协议=]

<acceptor name="artemis">tcp://localhost:61616?anycastPrefix=anycast://;multicastPrefix=multicast://;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;protocols=OPENWIRE</acceptor>

更新: 以下示例应用程序使用 OpenWire 协议连接到带有先前接受器的 Artemis 实例。

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class Application {

   private final String BROKER_URL = "tcp://localhost:61616";
   private final String BROKER_USERNAME = "admin";
   private final String BROKER_PASSWORD = "admin";

   public static void main(String[] args) throws Exception {
      final ConfigurableApplicationContext context = SpringApplication.run(Application.class);
      System.out.println("********************* Sending message...");

      JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
      JmsTemplate jmsTemplateAnycast = context.getBean("jmsTemplateAnycast", JmsTemplate.class);
      JmsTemplate jmsTemplateMulticast = context.getBean("jmsTemplateMulticast", JmsTemplate.class);

      jmsTemplateAnycast.convertAndSend("VirtualTopic.simple", "Hello world anycast!");
      jmsTemplate.convertAndSend("anycast://VirtualTopic.simple", "Hello world anycast using prefix!");
      jmsTemplateMulticast.convertAndSend("VirtualTopic.simple", "Hello world multicast!");
      jmsTemplate.convertAndSend("multicast://VirtualTopic.simple", "Hello world multicast using prefix!");

      System.out.print("Press any key to close the context");
      System.in.read();

      context.close();
   }

   @Bean
   public ActiveMQConnectionFactory connectionFactory(){
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL(BROKER_URL);
      connectionFactory.setUserName(BROKER_USERNAME);
      connectionFactory.setPassword(BROKER_PASSWORD);
      return connectionFactory;
   }

   @Bean
   public JmsTemplate jmsTemplate(){
      JmsTemplate template = new JmsTemplate();
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateAnycast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(false);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateMulticast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(true);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrency("1-1");
      return factory;
   }

   @JmsListener(destination = "Consumer.A.VirtualTopic.simple")
   public void receiveMessageFromA(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM A: " + message);
   }

   @JmsListener(destination = "Consumer.B.VirtualTopic.simple")
   public void receiveMessageFromB(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM B: " + message);
   }

   @JmsListener(destination = "VirtualTopic.simple")
   public void receiveMessageFromTopic(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM TOPIC: " + message);
   }
}