来自另一个 ActiveMQ 实例的 ActiveMQ consume/forward 条消息
ActiveMQ consume/forward messages from another ActiveMQ instance
我有两个代理 A 和 B。如果我想将消息从 A 转发到 B,一切都很简单。我只需要像这样配置的代理中的网络连接器:
<networkConnectors>
<networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
如果我想从其他队列(让我们将其命名为 QUEUE.TO.CONSUME)使用来自代理 B 的消息,我很难接受,我只需要做同样的事情,但将双工设置为 true 并只听 QUEUE.TO.CONSUME 在经纪人 A 上是这样的:
<networkConnectors>
<networkConnector name="from-B-to-A" staticBridge="true" duplex="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.CONSUME" />
</staticallyIncludedDestinations>
</networkConnector>
<networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
但它并没有像我预期的那样工作。似乎只有每一秒的消息被转发,其余的都丢失了。令人惊讶的是,这在代理 B QUEUE.TO.CONSUME 上创建了两个消费者,我假设其中一个消费者在不转发给代理 A 的情况下使用消息。如何在代理 A 上创建桥接器,允许我在不丢失消息的情况下使用来自代理 B 的消息。在代理 B 中创建网络连接器目前不是一个选项。
我也试过像这样创建入站队列桥:
<jmsBridgeConnectors>
<jmsQueueConnector outboundQueueConnectionFactory="#remoteBroker" localUsername="user" localPassword="password">
<inboundQueueBridges>
<inboundQueueBridge inboundQueueName="QUEUE.TO.CONSUME" localQueueName="QUEUE.TO.CONSUME" />
</inboundQueueBridges>
</jmsQueueConnector>
</jmsBridgeConnectors>
...
</broker>
<bean id="remoteBroker" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover://(nio:B:61616)" />
<property name="userName" value="user" />
<property name="password" value="password" />
</bean>
此配置在远程代理 B 上创建了消费者,但它不使用任何消息,这些消息只是在排队时挂起而没有任何反应。代理 A 仍然没有收到任何消息到其本地队列。
好的,我明白了。我刚刚使用嵌入式 Apache Camel 定义到远程主机的路由,它看起来像这样(camel.xml 在 conf 目录中):
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="context" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="remoteBroker:queue:QUEUE.TO.CONSUME"/>
<to uri="localBroker:queue:QUEUE.TO.CONSUME"/>
</route>
</camelContext>
<bean id="remoteBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://B:61616"/>
<property name="userName" value="user"/>
<property name="password" value="password"/>
</bean>
</property>
</bean>
<bean id="localBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
</bean>
</beans>
其中 localhost 我代理 A。在 activemq.xml:
<import resource="camel.xml"/>
我有两个代理 A 和 B。如果我想将消息从 A 转发到 B,一切都很简单。我只需要像这样配置的代理中的网络连接器:
<networkConnectors>
<networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
如果我想从其他队列(让我们将其命名为 QUEUE.TO.CONSUME)使用来自代理 B 的消息,我很难接受,我只需要做同样的事情,但将双工设置为 true 并只听 QUEUE.TO.CONSUME 在经纪人 A 上是这样的:
<networkConnectors>
<networkConnector name="from-B-to-A" staticBridge="true" duplex="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.CONSUME" />
</staticallyIncludedDestinations>
</networkConnector>
<networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
<staticallyIncludedDestinations>
<queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
但它并没有像我预期的那样工作。似乎只有每一秒的消息被转发,其余的都丢失了。令人惊讶的是,这在代理 B QUEUE.TO.CONSUME 上创建了两个消费者,我假设其中一个消费者在不转发给代理 A 的情况下使用消息。如何在代理 A 上创建桥接器,允许我在不丢失消息的情况下使用来自代理 B 的消息。在代理 B 中创建网络连接器目前不是一个选项。 我也试过像这样创建入站队列桥:
<jmsBridgeConnectors>
<jmsQueueConnector outboundQueueConnectionFactory="#remoteBroker" localUsername="user" localPassword="password">
<inboundQueueBridges>
<inboundQueueBridge inboundQueueName="QUEUE.TO.CONSUME" localQueueName="QUEUE.TO.CONSUME" />
</inboundQueueBridges>
</jmsQueueConnector>
</jmsBridgeConnectors>
...
</broker>
<bean id="remoteBroker" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover://(nio:B:61616)" />
<property name="userName" value="user" />
<property name="password" value="password" />
</bean>
此配置在远程代理 B 上创建了消费者,但它不使用任何消息,这些消息只是在排队时挂起而没有任何反应。代理 A 仍然没有收到任何消息到其本地队列。
好的,我明白了。我刚刚使用嵌入式 Apache Camel 定义到远程主机的路由,它看起来像这样(camel.xml 在 conf 目录中):
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="context" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="remoteBroker:queue:QUEUE.TO.CONSUME"/>
<to uri="localBroker:queue:QUEUE.TO.CONSUME"/>
</route>
</camelContext>
<bean id="remoteBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://B:61616"/>
<property name="userName" value="user"/>
<property name="password" value="password"/>
</bean>
</property>
</bean>
<bean id="localBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
</bean>
</beans>
其中 localhost 我代理 A。在 activemq.xml:
<import resource="camel.xml"/>