Jms组件交易和骆驼路线交易

Jms component transacted and camel route transacted

After going through Camel In Action book, I encountered following doubts.

I have below 2 routes

A.
from("file:/home/src") //(A.1) .transacted("required") //(A.2) .bean("dbReader", "readFromDB()") //(A.3) only read from DB .bean("dbReader", "readFromDB()") //(A.4) only read from DB .to("jms:queue:DEST_QUEUE") //(A.5)

Questions:
A.a. Is transacted in (A.2) really required here ?

A.b. If answer to #a is yes, then what should be the associated transaction manager of the "required" policy ? Should it be JmsTransactionManager or JpaTransactionManager ?

A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?

B. from("jms:queue:SRC_QUEUE") //(B.1) transactional jms endpoint .transacted("required") //(B.2) .bean("someBean", "someMethod()") //(B.3) simple arithmetic computation .to("jms1:queue:DEST_QUEUE") //(B.4)

SRC_QUEUE and DEST_QUEUE are queues of different jms broker

Questions:

B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?

B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?

关于 Camel 事务处理的非常好的问题。

一般说明:在谈论 Camel 事务时,它意味着使用从具有事务能力的系统(如数据库或 JMS 代理)进行的事务处理。路由 中的 transacted 语句必须紧跟在 from 语句之后,因为它总是 与消耗 有关。

A.a. Is transacted in (A.2) really required here ?

不,不是。由于文件系统不支持事务处理,因此在这条路线上没有任何帮助。

A.b. If answer to #a is yes, then ... ?

没有"filesystem transaction manager"

A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?

不确定,但我不这么认为。生产者试图将消息传递给经纪人。 事务用于启用回滚,但是如果代理没有收到数据,回滚可以做什么?

B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?

这取决于因为 SRC 和 DEST 在不同的代理上

  • 如果你想要代理之间的端到端交易,你需要使用XA交易管理器,然后你必须将路由标记为transacted
  • 如果您同意 消费者交易,您可以 configure the JMS component for it 并省略 Spring Tx 管理器和 Camel transacted 语句。

澄清最后一点:如果您使用本地代理事务进行消费,Camel 在路由成功处理之前不会提交消息。因此,如果发生任何错误,将发生回滚并重新传递消息。

在大多数情况下,这完全没问题,但是,两个不同的代理仍然可能发生的情况是路由已成功处理,消息已传递到 DEST 代理,但 Camel 不再能够提交给 SRC 代理。然后发生重新投递,路由被再次处理并且 消息被多次投递到 DEST 代理

在我看来,XA 交易的复杂性比本地代理交易的非常罕见的边缘情况更难处理。但这是一个非常主观的意见,可能还取决于您使用的上下文或数据。

重要的是要注意:如果 SRC 和 DEST broker 相同,则本地 broker 交易 100% 足够! 绝对不需要 Spring Tx manager 和 Camel transacted.

B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?

与 B.a 的回答相同。

下午好,

我想花一点时间来回答您的问题。我将解决 'B' 个附带问题。

WRT:

B.a。 (B.1)中的JMS组件被标记为transacted,那么在这种情况下路由是否需要像(B.2)中提到的那样被transacted?

是的。源组件和目标组件都需要标记为已处理。将组件标记为已处理将在源和目标会话上启动本地 JMS 事务。请注意,这是两个独立的本地 JMS 事务,由两个独立的 JmsTransactionManagers 管理。

将路由标记为 'transacted' 将启动 JTA 事务上下文。请注意,PlatformTransactionManager 必须是 JtaTransactionManager。当调用 'to' 组件时,消息发送的本地 JMS 事务将与消息获取的本地事务同步。 (JTA 同步事务)。这意味着当远程代理确认发送的提交时,发送将获得回调。那时,接收到的消息将被提交。这是 'dups OK' 事务行为(不是 XA)。您有一个 window 消息已发送,但接收尚未确认。

实际上让这个工作很棘手。这是一个示例:

<!-- ******************** Camel route definition ********************* -->
<camelContext allowUseOriginalMessage="false"
    id="camelContext-Bridge-Local" streamCache="true" trace="true" xmlns="http://camel.apache.org/schema/blueprint">
    <route id="amq-to-amq">
        <from id="from" uri="amqLoc:queue:IN"/>
        <transacted id="trans"/>
        <to id="to" uri="amqRem:queue:OUT"/>
    </route>
</camelContext>
<!-- ********************* Local AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqLoc">
    <property name="configuration">
        <bean class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="AmqCFLocalPool"/>
            <property name="receiveTimeout" value="100000"/>
            <property name="maxConcurrentConsumers" value="3"/>
            <property name="cacheLevelName" value="CACHE_NONE"/>
            <property name="transacted" value="true"/>
        </bean>
    </property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory" id="AmqCFLocalPool">
    <property name="maxConnections" value="1"/>
    <property name="idleTimeout" value="0"/>
    <property name="connectionFactory" ref="AmqCFLocal"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFLocal">
    <property name="brokerURL" value="tcp://10.0.0.170:61616?jms.prefetchPolicy.all=0"/>
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
</bean>
<!-- ********************* Remote AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqRem">
    <property name="configuration">
        <bean class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="AmqCFRemotePool"/>
            <property name="transacted" value="true"/>
        </bean>
    </property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory"
    destroy-method="stop" id="AmqCFRemotePool" init-method="start">
    <property name="maxConnections" value="1"/>
    <property name="idleTimeout" value="0"/>
    <property name="connectionFactory" ref="AmqCFRemote"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFRemote">
    <property name="brokerURL" value="tcp://10.0.0.171:61616"/>
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
</bean>

为您正在使用的 JTA 事务管理器启用 org.springframework.jms.connection.JmsTransactionManager 和 DEBUG/TRACE 级别的 DEBUG 日志记录。