Spring Boot + ActiveMQ 以编程方式即时订阅主题

Spring Boot + ActiveMQ programmatically subscribe to topics on the fly

我正在尝试实现一项功能,我的侦听器 class 可以 subscribe/unsubscribe JMS 主题。 经过一些研究,没有找到一个明确的方法来做到这一点,我想出了两个解决方案:

  1. 有一个监听器 class,它包含一个字符串主题名称列表,并定期 运行 通过所有这些它应该被订阅的主题和 运行 阻塞 jmsTemplate.receiveAndConvert(topicName) 在每个(可能将阻塞操作本身委托给工作池)。 Subscribing/Unsubscribing 从主题中删除主题名称就像从列表中删除主题名称一样简单。
  2. 有一个工厂class,它将为应用程序需要订阅的每个主题构建一个新的侦听器,使用如下方法:

    public MessageListenerContainer createListener(String topic) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
      container.setConnectionFactory(connectionFactory());
      container.setDestinationName(topic);
      container.setMessageListener(new MyListenerClass());
      return container;
    

    }

第二个选项对我来说似乎更优雅,但我不确定听众的生命周期。我浏览了一些 spring boot 的 jms 和 activemq 模块的源代码并注意到 DefaultMessageListenerContainer 有方法 initialize()start() 虽然我不确定 how/if我需要使用这些,因为我可以找到以这种方式构建的 MessageListenerContainer 的唯一方法是作为 Bean 声明。 此外,当取消订阅主题时,因此想要销毁与其关联的侦听器容器,除了调用 stop(callback) 方法之外,是否还需要做更多的事情?

我对 JMS/ActiveMQ 及其 Spring 集成的理解是否正确,因为没有更简单的方法来实现这一点?我的做法正确吗?

恕我直言,只要你

  • 从spring获取connectionFactory(不是一个PooledConnectionFactory
  • 正确调用 initialise()start() 订阅和 stop() 取消订阅
  • 不要期望在异常情况下重新传递消息

第二种方法应该没问题

要在运行时注册新的 JmsListenerEndpoint,您必须完成 2 个步骤

1 创建自定义 MessageListener服务

@Service
public class CustomMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("[Custom message listener] " + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

2 注册新端点,使用 JmsListenerEndpointRegistry

@Service
public class MessageListenersService {
    @Autowired
    private JmsListenerEndpointRegistry registry;

    @Autowired
    @Qualifier("containerFactory")
    private DefaultJmsListenerContainerFactory factory;

    public void registerListener(String queueNameToListen, MessageListener listener) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("ep-"+listener.hashCode()); // ID is mandatory
        endpoint.setMessageListener(listener);
        endpoint.setDestination(queueNameToListen);
        registry.registerListenerContainer(endpoint, factory, true);
    }
}

使用它

    private static final String CUSTOM_DESTINATION = "queue-42";

    @Autowired
    MessageListenersService messageListenersService;
    @Autowired
    CustomMessageListener customMessageListener;
    @Autowired
    JmsTemplate jmsTemplate;

    @PostConstruct
    public void createCustomListener() throws InterruptedException {
        messageListenersService.registerListener(CUSTOM_DESTINATION, customMessageListener);
        jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));

        // wait your message:
        TimeUnit.SECONDS.sleep(1);
    }