路由到 ActiveMQ 代理内的虚拟目的地
Routing to virtual destinations inside ActiveMQ broker
我有一个 activemq 配置,其中有一个虚拟目标和一个普通主题
我想根据消息中的 JMSType 将所有 JMS 消息路由到目的地 (VirtualTopic.Notifications) 到 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) header.
我希望正常的主题 (VirtualTopic.gps) 像往常一样工作。
这是我对activemq.xml的配置。这里创建了 Consumer.SMS.VirtualTopic 和 Consumer.EMAIL.VirtualTopic。
<destinations>
<queue physicalName="Consumer.SMS.VirtualTopic" />
<queue physicalName="Consumer.EMAIL.VirtualTopic" />
</destinations>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
虽然消费者和主题 (VirtualTopic.gps) 是从服务器端代码创建的。
private static MessageProducer getTopicProducer(String topicName) throws JMSException {
MessageProducer producer = topicProducers.get(topicName);
if (producer == null) {
logger.info("Creating message producer for Topic : {}", topicName);
Destination destination = session.createTopic(topicName);
List<String> queueNames = PropertyReader
.getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
if (queueNames != null) {
for (String queueName : queueNames) {
Queue virtualQueue = session.createQueue(queueName);
MessageConsumer con = session.createConsumer(virtualQueue);
con.close();
}
}
producer = session.createProducer(destination);
topicProducers.put(topicName, producer);
}
return producer;
}
发送到 VirtualTopic.Notifications 的所有消息都被路由到 2 个不同的 queues,消费者可以从各自的 queues
中获取消息
但问题是所有发送到 VirtualTopic.gps 的消息都被过滤了,消费者无法使用 gps 消息。
The following example shows how to set up a element
in the XML configuration so that when a message is sent to MY.QUEUE
then it is really forwarded to the physical queue FOO and the topic
BAR.
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<queue physicalName="FOO" />
<topic physicalName="BAR" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
By default, subscribers cannot consume messages directly from a
composite queue or topic - it is a logical construct only. Given the
configuration above, subscribers can only consume messages from FOO
and BAR; but not MY.QUEUE. This behaviour can be altered to implement
use cases such as watching a queue by sending the same messages to a
notification topic (wire tapping), by setting the optionally set
forwardOnly attribute to false.
<compositeQueue name="IncomingOrders" forwardOnly="false">
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>
Messages sent to IncomingOrders will all be copied and forwarded to
Notifications, before being placed on the physical IncomingOrders
queue for consumption by subscribers.
看这里http://activemq.apache.org/virtual-destinations.html
根据您的实际配置,您只能从队列的 SMS 和 EMAIL 中使用,如果您想从 Notifications 中使用,您需要设置forwardOnly="false"
更新:
试试这个代码:
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;
public class SimpleSenderConsumerVirtualTopic {
public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
conn = cf.createConnection( );
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
.createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
conn.start();
ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
producer.send(msg);
msg = null;
while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
System.out.println("Received message is: " + msg.getText());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}
并添加:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
非常感谢哈森..
添加这一行<virtualTopic name=">" selectorAware="false" />
activemq.xml 成功了。
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications"
forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'"
queue="Consumer.SMS.VirtualTopic" />
<filteredDestination selector="JMSType ='EMAIL'"
queue="Consumer.EMAIL.VirtualTopic" />
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
我有一个 activemq 配置,其中有一个虚拟目标和一个普通主题
我想根据消息中的 JMSType 将所有 JMS 消息路由到目的地 (VirtualTopic.Notifications) 到 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) header.
我希望正常的主题 (VirtualTopic.gps) 像往常一样工作。
这是我对activemq.xml的配置。这里创建了 Consumer.SMS.VirtualTopic 和 Consumer.EMAIL.VirtualTopic。
<destinations>
<queue physicalName="Consumer.SMS.VirtualTopic" />
<queue physicalName="Consumer.EMAIL.VirtualTopic" />
</destinations>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
虽然消费者和主题 (VirtualTopic.gps) 是从服务器端代码创建的。
private static MessageProducer getTopicProducer(String topicName) throws JMSException {
MessageProducer producer = topicProducers.get(topicName);
if (producer == null) {
logger.info("Creating message producer for Topic : {}", topicName);
Destination destination = session.createTopic(topicName);
List<String> queueNames = PropertyReader
.getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
if (queueNames != null) {
for (String queueName : queueNames) {
Queue virtualQueue = session.createQueue(queueName);
MessageConsumer con = session.createConsumer(virtualQueue);
con.close();
}
}
producer = session.createProducer(destination);
topicProducers.put(topicName, producer);
}
return producer;
}
发送到 VirtualTopic.Notifications 的所有消息都被路由到 2 个不同的 queues,消费者可以从各自的 queues
中获取消息但问题是所有发送到 VirtualTopic.gps 的消息都被过滤了,消费者无法使用 gps 消息。
The following example shows how to set up a element in the XML configuration so that when a message is sent to MY.QUEUE then it is really forwarded to the physical queue FOO and the topic BAR.
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="FOO" /> <topic physicalName="BAR" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
By default, subscribers cannot consume messages directly from a composite queue or topic - it is a logical construct only. Given the configuration above, subscribers can only consume messages from FOO and BAR; but not MY.QUEUE. This behaviour can be altered to implement use cases such as watching a queue by sending the same messages to a notification topic (wire tapping), by setting the optionally set forwardOnly attribute to false.
<compositeQueue name="IncomingOrders" forwardOnly="false"> <forwardTo> <topic physicalName="Notifications" /> </forwardTo> </compositeQueue>
Messages sent to IncomingOrders will all be copied and forwarded to Notifications, before being placed on the physical IncomingOrders queue for consumption by subscribers.
看这里http://activemq.apache.org/virtual-destinations.html
根据您的实际配置,您只能从队列的 SMS 和 EMAIL 中使用,如果您想从 Notifications 中使用,您需要设置forwardOnly="false"
更新: 试试这个代码:
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;
public class SimpleSenderConsumerVirtualTopic {
public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
conn = cf.createConnection( );
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
.createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
conn.start();
ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
producer.send(msg);
msg = null;
while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
System.out.println("Received message is: " + msg.getText());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}
并添加:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
非常感谢哈森..
添加这一行<virtualTopic name=">" selectorAware="false" />
activemq.xml 成功了。
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications"
forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'"
queue="Consumer.SMS.VirtualTopic" />
<filteredDestination selector="JMSType ='EMAIL'"
queue="Consumer.EMAIL.VirtualTopic" />
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>