在 ActiveMQ Artemis 中使用通配符队列时未使用的消息

Messages not consumed when using wildcard queues in ActiveMQ Artemis

如果我在 ActiveMQ Artemis test.A 和通配符队列 test.# 上创建队列,那么我可以将消息发送到 test.A,它也会被传送到 test.# .但是,令我惊讶的是,当我使用来自 test.# 的消息时,该消息仍然存在于 test.A

如何更改我的代码或配置以获得预期的行为?

示例代码:

import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;

public class Main {
    static String AMQ = "vm://0";

    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ server = null;
        try {
            server = createEmbeddedBroker();

            var serverLocator = ActiveMQClient.createServerLocator(AMQ);
            var clientSessionFactory = serverLocator.createSessionFactory();

            createQueues(clientSessionFactory);

            // queues are empty on creation
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0);
                assertQueueLength(session, "test.A", 0);
            }
            
            sendMessage(clientSessionFactory, "test.A");

            // expect message is delivered to both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 1);
                assertQueueLength(session, "test.A", 1);
            }

            consumeMessage(clientSessionFactory, "test.#");

            // expect message is consumed from both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0); // ok - message gone
                assertQueueLength(session, "test.A", 0); // fails! 
            }

        } finally {
            if (server != null) server.stop();
        }
    }

    private static EmbeddedActiveMQ createEmbeddedBroker() throws Exception {
        var config = new ConfigurationImpl();
        config.addAcceptorConfiguration("vm", AMQ);
        config.setSecurityEnabled(false);
        config.setPersistenceEnabled(false);

        var server = new EmbeddedActiveMQ();
        server.setConfiguration(config);
        server.start();

        return server;
    }

    private static void createQueues(ClientSessionFactory csf)  throws Exception {
        var session = csf.createSession();

        /*
        <address name="test.A">
            <anycast>
                <queue name="test.A" />
            </anycast>
        </address>
        */
        var testA = new QueueConfiguration("test.A")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.A");
        session.createQueue(testA);

        /*
        <address name="test.#">
            <anycast>
                <queue name="test.#" />
            </anycast>
        </address>
        */
        var testWildcard = new QueueConfiguration("test.#")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.#");
        session.createQueue(testWildcard);

        // also tried to create address without a queue, but the message to test.A is not delivered to test.#
        // session.createAddress(new SimpleString("test.#"), RoutingType.ANYCAST, false);
    }

    private static void sendMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var producer = session.createProducer(queue);
        producer.send(session.createMessage(true));
        producer.close();
        session.close();
    }

    private static void consumeMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var consumer = session.createConsumer(queue);
        consumer.setMessageHandler(message -> {
            try {
                log("Consuming one message from " + queue);
                message.acknowledge();
                log("Consumed one message from " + queue);
            } catch (ActiveMQException e) {
                throw new IllegalStateException(e);
            }
        });
        session.start();

        Thread.sleep(1000); // hack to wait

        consumer.close();
        session.close();
    }

    private static void assertQueueLength(ClientSession session, String queue, long expected) throws Exception {
        long actual = session.queueQuery(SimpleString.toSimpleString(queue)).getMessageCount();
        if (actual != expected) {
            throw new IllegalStateException("Queue " + queue + " has " + actual + " messages. Expected " + expected);
        } else {
            log("Queue " + queue + " has " + actual + " messages as expected");
        }
    }

    private static void log(String msg) {
        System.out.println(">>> " + msg);
    }
}

依赖关系:

org.apache.activemq:artemis-core-client:2.17.0
org.apache.activemq:artemis-server:2.17.0

您所看到的预期的行为。这里要牢记的关键是您正在利用 wildcard routing 而不是通配符 consuming。使用通配符路由,消息不仅会路由到显式发送消息的地址的队列,还会路由到匹配通配符地址的任何队列。消息路由到的每个队列都有自己的消息副本。

通配符路由是在考虑到多播(即 pub/sub)用例(例如分层主题)的情况下实现的,但是如果您想将它与任播一起使用,则有几个选项:

  • 按原样接受语义。
  • 创建地址 test.A 没有 队列,例如:
    session.createAddress(SimpleString.toSimpleString("test.A"), RoutingType.ANYCAST, false);
    
    这是一个完全有效的配置,但您将无法直接使用来自 test.A 的消息,因为不存在这样的队列。您只能使用通配符地址上的队列中的消息。
  • test.A 队列上将无消费者清除设置为 true,例如:
    var testA = new QueueConfiguration("test.A")
        .setRoutingType(RoutingType.ANYCAST)
        .setPurgeOnNoConsumers(true)
        .setAddress("test.A");
    
    此设置将允许队列在消费者连接时接收消息,但一旦最后一个消费者断开连接,队列中的所有消息将被清除,只要没有消费者,消息就不会路由到它。

使用来自消费者的通配符从多个队列接收消息是一个很好的便利功能,但它没有在 ActiveMQ Artemis 中实现。但是,创建多个消费者应该不是很难。