Camel ActiveMQ 如何 destroy/purge 没有消费者的虚拟主题队列

Camel ActiveMQ how to destroy/purge a virtual topic queue without consumers

我的应用程序包含 JBoss Fuse 6.2.1 中的许多 OSGi 包 运行。每个 bundle 都有一个从 ActiveMQ 端点消耗的 Camel 路由。使用 VirtualTopics.

交换数据

ProducerBundle 发布到主题 VirtualTopic.MyTopic

ConsumerBundle A 从队列 Consumer.A.VirtualTopic.MyTopic
消费 ConsumerBundle B 从队列 Consumer.B.VirtualTopic.MyTopic
消费 ConsumerBundle C 从队列 Consumer.C.VirtualTopic.MyTopic

消费

在某一时刻消费者 C 被关闭,它的 bundle 被卸载并且永远不会回来。但是,消息仍然排入 Consumer.C.VirtualTopic.MyTopic 队列。
如何销毁这样的队列?

ActiveMQ 在队列填满时暂停生产者,我不能设置一个小的时间来处理消息,因为其他消费者可能需要一段时间来处理每条消息。我无法修改 VirtualTopic 结构。我可以完全访问 ActiveMQ 配置和 Camel 路由。
还有其他方法可以处理这种情况吗?

<!-- producer route -->
<route id="ProducerRoute"/>
    <from uri="direct:trigger"/>
    <to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>

<!-- each consumer route -->
<route id="ConsumerARoute">
    <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
    <to uri="bean:myProcessor"/>
</route>

查看 Apache ActiveMQ 文档,了解如何删除不活动的 queues/topics,例如:http://activemq.apache.org/delete-inactive-destinations.html

我选择了积极的解决方案:我挂接到 OSGi 包生命周期,当它停止时我使用 JMX MBeanServer 来销毁现在不需要的队列。

因为我的 bundle 是使用蓝图管理的,所以我选择了一个带有 destroy 方法的 bean。
这是一个示例实现:

我的豆子

package org.darugna.osgi;

import javax.management.MBeanServer;
import javax.management.ObjectName;

public class QueueDestroyer {

    private static final String[] QUEUES_TO_DESTROY = {
        "Consumer.A.VirtualTopic.MyTopic"
    };

    private MBeanServer mBeanServer;

    public void setMbeanServer(MBeanServer mBeanServer) {
        this.mBeanServer = mBeanServer;
    }

    public void destroy() throws Exception {
        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
        for (String queueName : QUEUES_TO_DESTROY) {
            Object returnValue = mBeanServer.invoke(brokerName,
                    "removeQueue",
                    new Object[]{queueName},
                    new String[]{String.class.getName()});
        }
    }

}

Blueprint.xml

<blueprint>

    <reference id="mbeanServer" interface="javax.management.MBeanServer" 
               availability="mandatory"/>

    <bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer" 
          destroy-method="destroy">
        <property name="mbeanServer" ref="mbeanServer"/>
    </bean>

    <camelContext>
        <route>
            <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
            <to uri="bean:myProcessor"/>
        </route>
    <camelContext>

</blueprint>    

我有过类似的情况,但我不能使用 Claus 的建议,因为在我的代理中还有其他没有消费者的队列,我不想删除它们。 在我的例子中,我是 运行 a JBoss Fuse 6.1.0 with fabric(我认为这与更新版本的 Fuse 相同):我刚刚删除了消费者(在我的例子中,我只是删除了profile with the consumer)然后我用hawtio控制台中的删除按钮删除了队列。