ActiveMQ 是否支持多个事务性消费者?

Does ActiveMQ support multiple transactional consumers?

我正在 ServiceMix 中开发 OSGI 包以从同一个 ActiveMQ 队列中使用。

我需要确保只有在一切顺利时才将消息从队列中取出,这样我就不会丢失该消息。所以我使用了骆驼交易客户端。我按照本教程 http://camel.apache.org/transactional-client.html

设法实现了它

但是,问题是当部署许多从同一个队列消费的 bundle 时,我只有一个消费者同时工作。

我应该怎么做才能在 ActiveMQ 中启用并行事务消费?

提前致谢。

此致,


这是我的实现(刚刚从教程中复制):

骆驼路线:

   <route>
      <from uri="activemq:queue:foo"/>
      <transacted ref="required"/>
      <process ref="myProcessor"/>
      <to uri="activemq:queue:bar"/>
    </route>

Spring 上下文:

<!-- setup JMS connection factory -->
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
</bean>

<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
</bean>

<!-- define our activemq component -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
    <!-- define the jms consumer/producer as transacted -->
    <property name="transacted" value="true"/>
    <!-- setup the transaction manager to use -->
    <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
         for instance use a JTA transaction manager then you must configure it -->
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>

可以使用 ActiveMQ JMS 并发消费者。

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory" />
    <property name="transacted" value="true" />
    <property name="concurrentConsumers" value="10" />
    <property name="deliveryPersistent" value="true" />
    <property name="requestTimeout" value="20000" />
    <property name="cacheLevelName"  value="CACHE_CONSUMER" />
</bean>

我终于在这里找到了解决方案http://activemq.2283324.n4.nabble.com/Blocking-transactions-td2354801.html;cid=1431937246689-831

问题是由于预取限制。建议在有多个消费者时将其设置为 0,即使他们不是事务性的。

所以我不得不通过添加 ?destination.consumer.prefetchSize=0 来改变我的骆驼路线 :

<route> 
<from uri="activemq:queue:foo?destination.consumer.prefetchSize=0"/>
<transacted ref="required"/> <process ref="myProcessor"/>
<to uri="activemq:queue:bar"/>
</route>