路由到 ActiveMQ 代理内的虚拟目的地

Routing to virtual destinations inside ActiveMQ broker

我有一个 activemq 配置,其中有一个虚拟目标和一个普通主题

我想根据消息中的 JMSType 将所有 JMS 消息路由到目的地 (VirtualTopic.Notifications) 到 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) header.

我希望正常的主题 (VirtualTopic.gps) 像往常一样工作。

这是我对activemq.xml的配置。这里创建了 Consumer.SMS.VirtualTopic 和 Consumer.EMAIL.VirtualTopic。

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

虽然消费者和主题 (VirtualTopic.gps) 是从服务器端代码创建的。

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

发送到 VirtualTopic.Notifications 的所有消息都被路由到 2 个不同的 queues,消费者可以从各自的 queues

中获取消息

但问题是所有发送到 VirtualTopic.gps 的消息都被过滤了,消费者无法使用 gps 消息。

The following example shows how to set up a element in the XML configuration so that when a message is sent to MY.QUEUE then it is really forwarded to the physical queue FOO and the topic BAR.

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="FOO" />
          <topic physicalName="BAR" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

By default, subscribers cannot consume messages directly from a composite queue or topic - it is a logical construct only. Given the configuration above, subscribers can only consume messages from FOO and BAR; but not MY.QUEUE. This behaviour can be altered to implement use cases such as watching a queue by sending the same messages to a notification topic (wire tapping), by setting the optionally set forwardOnly attribute to false.

<compositeQueue name="IncomingOrders" forwardOnly="false">
    <forwardTo>
        <topic physicalName="Notifications" />
    </forwardTo>
</compositeQueue>

Messages sent to IncomingOrders will all be copied and forwarded to Notifications, before being placed on the physical IncomingOrders queue for consumption by subscribers.

看这里http://activemq.apache.org/virtual-destinations.html

根据您的实际配置,您只能从队列的 SMS 和 EMAIL 中使用,如果您想从 Notifications 中使用,您需要设置forwardOnly="false"

更新: 试试这个代码:

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class SimpleSenderConsumerVirtualTopic {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
            conn = cf.createConnection( );
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.AUTO_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
            MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
            producer.send(msg);
            msg = null;
            while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
                System.out.println("Received message is: " + msg.getText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

并添加:

<destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
          <virtualTopic name=">"  selectorAware="false" />
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

非常感谢哈森..

添加这一行<virtualTopic name=">" selectorAware="false" /> activemq.xml 成功了。

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="VirtualTopic.Notifications"
                    forwardOnly="false">
                    <forwardTo>
                        <filteredDestination selector="JMSType = 'SMS'"
                            queue="Consumer.SMS.VirtualTopic" />
                        <filteredDestination selector="JMSType ='EMAIL'"
                            queue="Consumer.EMAIL.VirtualTopic" />
                    </forwardTo>
                </compositeQueue>
                <virtualTopic name=">" selectorAware="false" />
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>