将消息从本地代理传递到断开连接的中央代理
Deliver message from a local broker to a disconected central broker
我有一个特定的要求,我需要将消息发送到服务器,但服务器并不总是可用。
为此,我使用了特定于 ActiveMQ 的代理网络。
目标是拥有一个 local 应用程序 A(仅限生产者),它将消息推送到另一个 central 应用程序 B(仅限消费者).但是网络并不总是可用的。所以应用程序的代理必须存储消息并等待连接,然后才能将消息发送到应用程序 B。所以基本上 A 是一个代理,需要在可用时将消息转发给 B
Broker 的 B 配置包括一个正在侦听以使用消息的持久主题。
正如 ActiveMQ 中所说 documentation 我必须使用静态网桥来做到这一点,这就是我所做的。
注意:我不能让 B 订阅 A,因为 A 会有多个实例,我无法在 B 中配置所有实例。
所以这是我的本地应用程序配置(原始 spring):
<!--As said in http://activemq.apache.org/spring-support.html use
a pooled conntection along with JMSTemplate -->
<amq:connectionFactory id="jmsFactory" brokerURL="${jms.broker.local.url}" />
<!--SpringJMSTemplate -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
</bean>
<!-- local broker with embedded -->
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
<property name="brokerName" value="localBroker" />
<property name="transportConnectorURIs">
<list>
<value>${jms.broker.local.url}</value>
</list>
</property>
<property name="networkConnectors">
<list>
<ref bean="networkConnector" />
</list>
</property>
</bean>
<amq:connectionFactory id="remoteJmsFactory"
brokerURL="${jms.broker.remote.url}" clientIDPrefix="BRIDGED-TEST" />
<bean id="networkConnector" class="org.apache.activemq.network.DiscoveryNetworkConnector">
<property name="uri" value="static:(${jms.broker.remote.url})"></property>
<property name="staticallyIncludedDestinations">
<list>
<bean class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg type="java.lang.String" value="${jms.topic.sample}"/>
</bean>
</list>
</property>
<property name="staticBridge" value="true"></property><!-- will deliver content even if no consumer, usefull for durable topic only -->
</bean>
localbroker 是一个连接到远程代理(您可以从 apacheMQ 页面下载的应用程序)的嵌入式代理。
这里是中央配置
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useVirtualDestSubs="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="http" uri="http://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=10485760"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
所以当我尝试 send/receive 消息时发生了什么:
- 如果生产者 (A) 连接并且消费者 (B) 连接到他们各自的代理,并且代理连接在一起,它工作正常。
- 如果消费者 (B) 连接到他的代理并且有消息待处理,而生产者 A 的代理断开连接,它工作正常。
- 如果生产者 (A) 与网络断开连接,当 B 再次可用时,A 的代理不会将消息传递给 B 的代理。
在使用网络连接器之前,我在本地代理配置中尝试使用 outboundTopicBridge 的 jmsbridgeConnector,但没有成功。
问题是:如何让本地的代理 A 在重新连接时向中央的代理 B 发送消息。虽然它不可用,但请确保他不会收到任何消息。os
注意:
- 我工作的网络并不总是可用(可能 天!),我只能依赖 http 端口,这就是为什么它是唯一的打开。这意味着没有多播发现是 possible.
- 消息只能传递一次。
- 我使用本地代理的原因是不管理我必须发送给自己的东西。它们目前仅用于存储和转发到中央。
编辑:我已经能够使用 JMS Bridge 使其工作,但是我有最后一个问题,如果连接 os lost 在应用程序启动或应用程序生命周期期间,我需要重新启动我的经纪人才能发送消息。
我一直在使用这种 "store and forward" 模式并成功使用桥接。
我无法对网络连接器发表评论,但对于网桥,您必须:
- 由于Bug AMQ-5859
,请使用最新版本的 jmeter
- 在桥上加一个
org.apache.activemq.network.jms.ReconnectionPolicy
- 确保在远程代理连接工厂上设置了 reconnectOnException
我已经尝试了所有方法,但仅使用配置无法让它工作,所以我最终自己做了:
<jms:listener-container container-type="default" factory-id="proxyFactory"
acknowledge="transacted" destination-type="topic" connection-factory="jmsFactory">
<bean id="remoteJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="remoteJmsFactory" />
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="simpleMessageProxyListener"
class="com.xxx.jms.test.SimpleMessageProxyListener">
<property name="jmsTemplate" ref="remoteJmsTemplate" />
<property name="queueName" value="${jms.topic.sample}" />
</bean>
基本上我只有一个 class 订阅具有持久订阅的本地代理,并向远程发送消息,如果失败,会话将回滚。
那个简单的代理依赖于 Spring Listener 的容器,所以即使他监听远程代理它也可能能够工作,在我的例子中,它正在监听本地嵌入式代理,所以我不会'没问题。
如果其他人有一个唯一的配置答案,当 stopping/starting 远程代理在本地应用程序运行时有效并且不需要重新启动来发送消息时,请随意 post ,我会点赞和查看。
注意:您必须将 jms.redeliveryPolicy.maximumDeliveries
设置为 -1
才能使其正常工作。
我有一个特定的要求,我需要将消息发送到服务器,但服务器并不总是可用。
为此,我使用了特定于 ActiveMQ 的代理网络。
目标是拥有一个 local 应用程序 A(仅限生产者),它将消息推送到另一个 central 应用程序 B(仅限消费者).但是网络并不总是可用的。所以应用程序的代理必须存储消息并等待连接,然后才能将消息发送到应用程序 B。所以基本上 A 是一个代理,需要在可用时将消息转发给 B
Broker 的 B 配置包括一个正在侦听以使用消息的持久主题。
正如 ActiveMQ 中所说 documentation 我必须使用静态网桥来做到这一点,这就是我所做的。
注意:我不能让 B 订阅 A,因为 A 会有多个实例,我无法在 B 中配置所有实例。
所以这是我的本地应用程序配置(原始 spring):
<!--As said in http://activemq.apache.org/spring-support.html use
a pooled conntection along with JMSTemplate -->
<amq:connectionFactory id="jmsFactory" brokerURL="${jms.broker.local.url}" />
<!--SpringJMSTemplate -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
</bean>
<!-- local broker with embedded -->
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
<property name="brokerName" value="localBroker" />
<property name="transportConnectorURIs">
<list>
<value>${jms.broker.local.url}</value>
</list>
</property>
<property name="networkConnectors">
<list>
<ref bean="networkConnector" />
</list>
</property>
</bean>
<amq:connectionFactory id="remoteJmsFactory"
brokerURL="${jms.broker.remote.url}" clientIDPrefix="BRIDGED-TEST" />
<bean id="networkConnector" class="org.apache.activemq.network.DiscoveryNetworkConnector">
<property name="uri" value="static:(${jms.broker.remote.url})"></property>
<property name="staticallyIncludedDestinations">
<list>
<bean class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg type="java.lang.String" value="${jms.topic.sample}"/>
</bean>
</list>
</property>
<property name="staticBridge" value="true"></property><!-- will deliver content even if no consumer, usefull for durable topic only -->
</bean>
localbroker 是一个连接到远程代理(您可以从 apacheMQ 页面下载的应用程序)的嵌入式代理。
这里是中央配置
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useVirtualDestSubs="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="http" uri="http://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=10485760"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
所以当我尝试 send/receive 消息时发生了什么:
- 如果生产者 (A) 连接并且消费者 (B) 连接到他们各自的代理,并且代理连接在一起,它工作正常。
- 如果消费者 (B) 连接到他的代理并且有消息待处理,而生产者 A 的代理断开连接,它工作正常。
- 如果生产者 (A) 与网络断开连接,当 B 再次可用时,A 的代理不会将消息传递给 B 的代理。
在使用网络连接器之前,我在本地代理配置中尝试使用 outboundTopicBridge 的 jmsbridgeConnector,但没有成功。
问题是:如何让本地的代理 A 在重新连接时向中央的代理 B 发送消息。虽然它不可用,但请确保他不会收到任何消息。os
注意:
- 我工作的网络并不总是可用(可能 天!),我只能依赖 http 端口,这就是为什么它是唯一的打开。这意味着没有多播发现是 possible.
- 消息只能传递一次。
- 我使用本地代理的原因是不管理我必须发送给自己的东西。它们目前仅用于存储和转发到中央。
编辑:我已经能够使用 JMS Bridge 使其工作,但是我有最后一个问题,如果连接 os lost 在应用程序启动或应用程序生命周期期间,我需要重新启动我的经纪人才能发送消息。
我一直在使用这种 "store and forward" 模式并成功使用桥接。
我无法对网络连接器发表评论,但对于网桥,您必须:
- 由于Bug AMQ-5859 ,请使用最新版本的 jmeter
- 在桥上加一个
org.apache.activemq.network.jms.ReconnectionPolicy
- 确保在远程代理连接工厂上设置了 reconnectOnException
我已经尝试了所有方法,但仅使用配置无法让它工作,所以我最终自己做了:
<jms:listener-container container-type="default" factory-id="proxyFactory"
acknowledge="transacted" destination-type="topic" connection-factory="jmsFactory">
<bean id="remoteJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="remoteJmsFactory" />
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="simpleMessageProxyListener"
class="com.xxx.jms.test.SimpleMessageProxyListener">
<property name="jmsTemplate" ref="remoteJmsTemplate" />
<property name="queueName" value="${jms.topic.sample}" />
</bean>
基本上我只有一个 class 订阅具有持久订阅的本地代理,并向远程发送消息,如果失败,会话将回滚。
那个简单的代理依赖于 Spring Listener 的容器,所以即使他监听远程代理它也可能能够工作,在我的例子中,它正在监听本地嵌入式代理,所以我不会'没问题。
如果其他人有一个唯一的配置答案,当 stopping/starting 远程代理在本地应用程序运行时有效并且不需要重新启动来发送消息时,请随意 post ,我会点赞和查看。
注意:您必须将 jms.redeliveryPolicy.maximumDeliveries
设置为 -1
才能使其正常工作。