正确迭代来自 ActiveMQ DestinationSource.getQueues 响应的队列

Properly iterating over queues from ActiveMQ DestinationSource.getQueues response

由于某些原因,在以下代码中,destinationSource.getQueues() 返回 CopyOnWriteArraySet 而不是简单的 Set。这是一个问题,因为 for 循环在 Set 满之前开始处理,并且由于 CopyOnWriteArraySet 的性质,它只会在循环之前处理 Set 中的项目。我知道我可以在其中添加 Thread.sleep() 但这并不能解决根本问题。是否有任何理由将其作为 CopyOnWriteArraySet 而不是 Set 返回?还有什么方法可以迭代 CopyOnWriteArraySet 以确保覆盖所有项目,甚至是在迭代期间添加的项目?

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnection.start();
DestinationSource destinationSource = activeMQConnection.getDestinationSource();

Set<ActiveMQQueue> queues = destinationSource.getQueues();

for(ActiveMQQueue queue : queues) {
  queueNames.add(queue.getPhysicalName());
}

activeMQConnection.close()

编辑:这是我想出的解决方案,虽然它并不完美,但它确保您可以让所有队列排队,直到添加的队列之间的间隔超过 1 秒。

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

    ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();

    activeMQConnection.start();

    DestinationSource destinationSource = activeMQConnection.getDestinationSource();

    Set<ActiveMQQueue> queues = destinationSource.getQueues();

    do {
        for(ActiveMQQueue queue : queues) {
            String physcialName = queue.getPhysicalName();
            if(!queueNames.contains(physcialName)) {
                queueNames.add(physcialName);
            }
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log(e.toString());
        }
    }while(queueNames.size() < queues.size());

    activeMQConnection.close();

正如您所说,CopyOnWriteArraySet 的本性就是这样。队列列表可以与您的线程同时更改。通过向您返回一个 CopyOnWriteArraySet ActiveMQ 为您提供了一种可以在您的线程中安全使用的数据结构(ConcurrentModificationException 没有变化)并且将保持最新。

由于可以随时添加新队列,因此无法 "wait" 直到它们全部完成。

如果你想知道什么时候添加了新的队列然后做一些响应,最好的方法是监听适当的 ActiveMQ advisory message。该工具将使您能够响应消息队列的添加、消费者和生产者的添加,以及它们的删除。 Think link 有一个代码示例。

我在从一个连接中获取所有队列时遇到了同样的问题。 每当我从 DestinationSource 得到队列然后在这个集合上迭代(foreach)时, 我得到了不同数量的队列(在迭代循环中,我总是得到比初始集合更多的队列)。

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

然后,我像这样向目标源添加了一个监听器

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
// Add listener:
ds.setDestinationListener(event -> event.hashCode());
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

从现在开始,我总是得到正确数量的队列并且可以迭代整个集合。

尽管如此,我真的不知道为什么 ;)