Camel ActiveMQ 如何 destroy/purge 没有消费者的虚拟主题队列
Camel ActiveMQ how to destroy/purge a virtual topic queue without consumers
我的应用程序包含 JBoss Fuse 6.2.1 中的许多 OSGi 包 运行。每个 bundle 都有一个从 ActiveMQ 端点消耗的 Camel 路由。使用 VirtualTopics.
交换数据
ProducerBundle 发布到主题 VirtualTopic.MyTopic
ConsumerBundle A 从队列 Consumer.A.VirtualTopic.MyTopic
消费
ConsumerBundle B 从队列 Consumer.B.VirtualTopic.MyTopic
消费
ConsumerBundle C 从队列 Consumer.C.VirtualTopic.MyTopic
消费
在某一时刻消费者 C 被关闭,它的 bundle 被卸载并且永远不会回来。但是,消息仍然排入 Consumer.C.VirtualTopic.MyTopic
队列。
如何销毁这样的队列?
ActiveMQ 在队列填满时暂停生产者,我不能设置一个小的时间来处理消息,因为其他消费者可能需要一段时间来处理每条消息。我无法修改 VirtualTopic 结构。我可以完全访问
ActiveMQ 配置和 Camel 路由。
还有其他方法可以处理这种情况吗?
<!-- producer route -->
<route id="ProducerRoute"/>
<from uri="direct:trigger"/>
<to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>
<!-- each consumer route -->
<route id="ConsumerARoute">
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
查看 Apache ActiveMQ 文档,了解如何删除不活动的 queues/topics,例如:http://activemq.apache.org/delete-inactive-destinations.html
我选择了积极的解决方案:我挂接到 OSGi 包生命周期,当它停止时我使用 JMX MBeanServer 来销毁现在不需要的队列。
因为我的 bundle 是使用蓝图管理的,所以我选择了一个带有 destroy 方法的 bean。
这是一个示例实现:
我的豆子
package org.darugna.osgi;
import javax.management.MBeanServer;
import javax.management.ObjectName;
public class QueueDestroyer {
private static final String[] QUEUES_TO_DESTROY = {
"Consumer.A.VirtualTopic.MyTopic"
};
private MBeanServer mBeanServer;
public void setMbeanServer(MBeanServer mBeanServer) {
this.mBeanServer = mBeanServer;
}
public void destroy() throws Exception {
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
for (String queueName : QUEUES_TO_DESTROY) {
Object returnValue = mBeanServer.invoke(brokerName,
"removeQueue",
new Object[]{queueName},
new String[]{String.class.getName()});
}
}
}
Blueprint.xml
<blueprint>
<reference id="mbeanServer" interface="javax.management.MBeanServer"
availability="mandatory"/>
<bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer"
destroy-method="destroy">
<property name="mbeanServer" ref="mbeanServer"/>
</bean>
<camelContext>
<route>
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
<camelContext>
</blueprint>
我有过类似的情况,但我不能使用 Claus 的建议,因为在我的代理中还有其他没有消费者的队列,我不想删除它们。
在我的例子中,我是 运行 a JBoss Fuse 6.1.0 with fabric(我认为这与更新版本的 Fuse 相同):我刚刚删除了消费者(在我的例子中,我只是删除了profile with the consumer)然后我用hawtio控制台中的删除按钮删除了队列。
我的应用程序包含 JBoss Fuse 6.2.1 中的许多 OSGi 包 运行。每个 bundle 都有一个从 ActiveMQ 端点消耗的 Camel 路由。使用 VirtualTopics.
交换数据ProducerBundle 发布到主题 VirtualTopic.MyTopic
ConsumerBundle A 从队列 Consumer.A.VirtualTopic.MyTopic
消费
ConsumerBundle B 从队列 Consumer.B.VirtualTopic.MyTopic
消费
ConsumerBundle C 从队列 Consumer.C.VirtualTopic.MyTopic
在某一时刻消费者 C 被关闭,它的 bundle 被卸载并且永远不会回来。但是,消息仍然排入 Consumer.C.VirtualTopic.MyTopic
队列。
如何销毁这样的队列?
ActiveMQ 在队列填满时暂停生产者,我不能设置一个小的时间来处理消息,因为其他消费者可能需要一段时间来处理每条消息。我无法修改 VirtualTopic 结构。我可以完全访问
ActiveMQ 配置和 Camel 路由。
还有其他方法可以处理这种情况吗?
<!-- producer route -->
<route id="ProducerRoute"/>
<from uri="direct:trigger"/>
<to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>
<!-- each consumer route -->
<route id="ConsumerARoute">
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
查看 Apache ActiveMQ 文档,了解如何删除不活动的 queues/topics,例如:http://activemq.apache.org/delete-inactive-destinations.html
我选择了积极的解决方案:我挂接到 OSGi 包生命周期,当它停止时我使用 JMX MBeanServer 来销毁现在不需要的队列。
因为我的 bundle 是使用蓝图管理的,所以我选择了一个带有 destroy 方法的 bean。
这是一个示例实现:
我的豆子
package org.darugna.osgi;
import javax.management.MBeanServer;
import javax.management.ObjectName;
public class QueueDestroyer {
private static final String[] QUEUES_TO_DESTROY = {
"Consumer.A.VirtualTopic.MyTopic"
};
private MBeanServer mBeanServer;
public void setMbeanServer(MBeanServer mBeanServer) {
this.mBeanServer = mBeanServer;
}
public void destroy() throws Exception {
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
for (String queueName : QUEUES_TO_DESTROY) {
Object returnValue = mBeanServer.invoke(brokerName,
"removeQueue",
new Object[]{queueName},
new String[]{String.class.getName()});
}
}
}
Blueprint.xml
<blueprint>
<reference id="mbeanServer" interface="javax.management.MBeanServer"
availability="mandatory"/>
<bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer"
destroy-method="destroy">
<property name="mbeanServer" ref="mbeanServer"/>
</bean>
<camelContext>
<route>
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
<camelContext>
</blueprint>
我有过类似的情况,但我不能使用 Claus 的建议,因为在我的代理中还有其他没有消费者的队列,我不想删除它们。 在我的例子中,我是 运行 a JBoss Fuse 6.1.0 with fabric(我认为这与更新版本的 Fuse 相同):我刚刚删除了消费者(在我的例子中,我只是删除了profile with the consumer)然后我用hawtio控制台中的删除按钮删除了队列。