骆驼路线中的生产者流量控制问题(持久消息)
Issue with Producer Flow Control in Camel Route (Persistent Messages)
我在寻找正确的 activemq 配置集以确保 Apache Camel 路由中的消息吞吐量一致时遇到问题。
当前配置使用以下技术:
- 骆驼 (2.15.2)
- ActiveMQ (5.12.1)
Tomcat (7.0.56)
下面是在 Camel 中用于 ActiveMQ 的一组 bean 配置:
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" />
<property name="watchTopicAdvisories" value="false" />
<property name="producerWindowSize" value="2300" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="20" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="idleTimeout" value="0"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
-->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
这是在 activemq.xml 文件中找到的代理特定配置:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
<policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<persistenceAdapter>
<kahaDB directory="./activemq/kahadb/" />
</persistenceAdapter>
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
<memoryUsage>
<memoryUsage limit="750 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="500 mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:6616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
我是运行下面的骆驼路线。队列 A 收到大量消息 (1000/s),因此它开始很快填满,因为这些消息的最终消费者无法跟上。由于消息数量最终达到持久存储生产者流控制规则的 50%,因此阻止将更多消息放置在队列 A 上。但是,当我通过 JMX 检查队列深度时,队列 A 和 B 都不会改变,就好像消费者被阻塞一样也。
from(activemq:queue:PICKAXE.L5.PROC.A)
.to(activemq:queue:PICKAXE.L5.COL.B);
from(activemq:queue:PICKAXE.L5.COL.B)
.autoStartup(!localFlag)
.to(customEndpoint)
.routeId(collectionRouteId);
大约一个星期以来,我尝试了 jms/activemq 配置的各种排列,但没有成功,所以我将不胜感激任何想法。我希望的行为是让此流中的消息消费者继续从持久存储中删除消息,这将允许消息继续彻底流动。
此问题是由过大的 sendFailIfNoSpaceAfterTimeout 引起的,在上述配置中被设置为 3000000。这导致代理在确认 send() 命令因持久存储已满而失败之前等待。
以上配置替换为:
<systemUsage sendFailIfNoSpaceAfterTimeout="300">
这确保(由于消息是持久的并且队列被集成到 Camel 路由中)当持久存储已满导致故障时,每 0.3 秒重试一次 send() 操作。
我在寻找正确的 activemq 配置集以确保 Apache Camel 路由中的消息吞吐量一致时遇到问题。 当前配置使用以下技术:
- 骆驼 (2.15.2)
- ActiveMQ (5.12.1)
Tomcat (7.0.56)
下面是在 Camel 中用于 ActiveMQ 的一组 bean 配置:
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" /> <property name="watchTopicAdvisories" value="false" /> <property name="producerWindowSize" value="2300" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="20" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="idleTimeout" value="0"/> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="transacted" value="true"/>
-->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig" /> </bean>
这是在 activemq.xml 文件中找到的代理特定配置:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
<policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<persistenceAdapter>
<kahaDB directory="./activemq/kahadb/" />
</persistenceAdapter>
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
<memoryUsage>
<memoryUsage limit="750 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="500 mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:6616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
我是运行下面的骆驼路线。队列 A 收到大量消息 (1000/s),因此它开始很快填满,因为这些消息的最终消费者无法跟上。由于消息数量最终达到持久存储生产者流控制规则的 50%,因此阻止将更多消息放置在队列 A 上。但是,当我通过 JMX 检查队列深度时,队列 A 和 B 都不会改变,就好像消费者被阻塞一样也。
from(activemq:queue:PICKAXE.L5.PROC.A)
.to(activemq:queue:PICKAXE.L5.COL.B);
from(activemq:queue:PICKAXE.L5.COL.B)
.autoStartup(!localFlag)
.to(customEndpoint)
.routeId(collectionRouteId);
大约一个星期以来,我尝试了 jms/activemq 配置的各种排列,但没有成功,所以我将不胜感激任何想法。我希望的行为是让此流中的消息消费者继续从持久存储中删除消息,这将允许消息继续彻底流动。
此问题是由过大的 sendFailIfNoSpaceAfterTimeout 引起的,在上述配置中被设置为 3000000。这导致代理在确认 send() 命令因持久存储已满而失败之前等待。
以上配置替换为:
<systemUsage sendFailIfNoSpaceAfterTimeout="300">
这确保(由于消息是持久的并且队列被集成到 Camel 路由中)当持久存储已满导致故障时,每 0.3 秒重试一次 send() 操作。