如果所有侦听器都被销毁,为什么 ActiveMQ Artemis 会自动删除地址?

Why ActiveMQ Artemis auto delete address if all listeners are destroyed?

我们正在开发一个 micro-service 系统,该系统使用 ActiveMQ Artemis 作为服务之间的通信方法。由于要求能够在 运行 时间停止监听器,我们不能使用 spring-artemis 提供的 @JmsListener。在浏览互联网并发现 spring 在幕后使用 MessageListenerContainer 之后,我们想出了维护一个 MessageListenerContainer 我们自己的列表的想法。

    @Bean(name = "commandJmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory commandJmsListenerContainerFactory(
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    // Use
    private Map<String, DefaultMessageListenerContainer> commandQueue;

    public void subscribeToCommandQueue(String queueName, CommandListener<?> command) {
        commandQueue.computeIfAbsent(queueName, key -> {
            var endPoint = new SimpleJmsListenerEndpoint();
            endPoint.setDestination(queueName);
            endPoint.setMessageListener(message -> {
                try {
                    var body = message.getBody(String.class);
                    command.execute(commandMessageConverter.deserialize(body));
                } catch (JMSException e) {
                    throw new RuntimeException("Error while process message for queue: " + queueName, e);
                }
            });
            var container = commandJmsListenerContainerFactory.createListenerContainer(endPoint);
            // 
            // for Every object of Spring classes that implement InitializingBean created manually, we need to call afterPropertiesSet to make the object "work"
            container.afterPropertiesSet();
            container.start();
            return container;
        });
    }

    public void start() {
        commandQueue = new ConcurrentHashMap<>();
    }

    public void stop() {
        commandQueue.values().forEach(DefaultMessageListenerContainer::destroy);
        commandQueue.clear();
    }

在测试时,我注意到在调用 stop() 销毁所有监听器后,Artemis 控制台中的队列和地址也被删除。持久订阅不是这种情况。

    @Bean(name = "eventJmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory eventJmsListenerContainerFactory(
        CachingConnectionFactory cachingConnectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
        cachingConnectionFactory.setClientId(UUID.randomUUID().toString());
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, cachingConnectionFactory);
        factory.setPubSubDomain(true);
        factory.setSubscriptionDurable(true);
        return factory;
    }

    // usage is the same as the first block code, except we store multicast subscriptions in another map
    private Map<String, DefaultMessageListenerContainer> eventTopic;

在运行单元测试并销毁两个映射的所有侦听器之后,只保留了test-event-topic地址及其队列,删除了test-command-queue。为什么两个队列的行为不同?

此外,正确的行为是什么?我们担心自动删除会删除队列中尚未发送的消息。另一方面,如果我们 运行 一次又一次地测试, test-event-topic 下的新队列会不断创建。我认为这是因为行 cachingConnectionFactory.setClientId(UUID.randomUUID().toString()); 。但对于持久订阅,不设置 clientId 会导致错误。

应用中使用的连接工厂是由 spring-artemis

创建的 CachingConnectionFactory

默认情况下,当核心 JMS 客户端发送消息或创建消费者时,代理将根据需要自动创建地址和队列。当不再需要这些资源时(即当队列没有消费者和消息时,或者当地址不再绑定任何队列时),这些资源也会默认自动删除。这由 broker.xml 中的这些设置控制,这些设置在 the documentation:

中讨论
  • auto-create-queues
  • auto-delete-queues
  • auto-create-addresses
  • auto-delete-addresses

需要明确的是,默认情况下自动删除不应导致任何消息丢失,因为只有当队列有 0 个消费者和 0 个消息时才应删除它们。但是,您始终可以将自动删除设置为 false 以确保 100% 安全。

表示持久 JMS 主题订阅的队列不会被删除,因为它们旨在在消费者离线时保留和收集消息。换句话说,如果使用订阅的客户端在没有首先显式删除订阅的情况下关闭,持久主题订阅将保留。这就是持久订阅的全部意义所在 - 它们 持久。任何客户端都可以使用持久主题订阅,如果它连接到相同的客户端 ID 并使用相同的订阅名称。但是,除非持久订阅是“共享”持久订阅,否则一次只能连接一个客户端。 JMS 2.0 中添加了共享持久主题订阅。