Jms组件交易和骆驼路线交易
Jms component transacted and camel route transacted
After going through Camel In Action book, I encountered following doubts.
I have below 2 routes
A.
from("file:/home/src") //(A.1)
.transacted("required") //(A.2)
.bean("dbReader", "readFromDB()") //(A.3) only read from DB
.bean("dbReader", "readFromDB()") //(A.4) only read from DB
.to("jms:queue:DEST_QUEUE") //(A.5)
Questions:
A.a. Is transacted in (A.2) really required here ?
A.b. If answer to #a is yes, then what should be the associated transaction manager of the "required" policy ? Should it be JmsTransactionManager or JpaTransactionManager ?
A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?
B.
from("jms:queue:SRC_QUEUE") //(B.1) transactional jms endpoint
.transacted("required") //(B.2)
.bean("someBean", "someMethod()") //(B.3) simple arithmetic computation
.to("jms1:queue:DEST_QUEUE") //(B.4)
SRC_QUEUE and DEST_QUEUE are queues of different jms broker
Questions:
B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?
B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?
关于 Camel 事务处理的非常好的问题。
一般说明:在谈论 Camel 事务时,它意味着使用从具有事务能力的系统(如数据库或 JMS 代理)进行的事务处理。路由 中的 transacted
语句必须紧跟在 from
语句之后,因为它总是 与消耗 有关。
A.a. Is transacted in (A.2) really required here ?
不,不是。由于文件系统不支持事务处理,因此在这条路线上没有任何帮助。
A.b. If answer to #a is yes, then ... ?
没有"filesystem transaction manager"
A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?
不确定,但我不这么认为。生产者试图将消息传递给经纪人。 事务用于启用回滚,但是如果代理没有收到数据,回滚可以做什么?
B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?
这取决于因为 SRC 和 DEST 在不同的代理上。
- 如果你想要代理之间的端到端交易,你需要使用XA交易管理器,然后你必须将路由标记为
transacted
。
- 如果您同意 消费者交易,您可以 configure the JMS component for it 并省略 Spring Tx 管理器和 Camel
transacted
语句。
澄清最后一点:如果您使用本地代理事务进行消费,Camel 在路由成功处理之前不会提交消息。因此,如果发生任何错误,将发生回滚并重新传递消息。
在大多数情况下,这完全没问题,但是,两个不同的代理仍然可能发生的情况是路由已成功处理,消息已传递到 DEST 代理,但 Camel 不再能够提交给 SRC 代理。然后发生重新投递,路由被再次处理并且 消息被多次投递到 DEST 代理 。
在我看来,XA 交易的复杂性比本地代理交易的非常罕见的边缘情况更难处理。但这是一个非常主观的意见,可能还取决于您使用的上下文或数据。
重要的是要注意:如果 SRC 和 DEST broker 相同,则本地 broker 交易 100% 足够! 绝对不需要 Spring Tx manager 和 Camel transacted
.
B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?
与 B.a 的回答相同。
下午好,
我想花一点时间来回答您的问题。我将解决 'B' 个附带问题。
WRT:
B.a。 (B.1)中的JMS组件被标记为transacted,那么在这种情况下路由是否需要像(B.2)中提到的那样被transacted?
是的。源组件和目标组件都需要标记为已处理。将组件标记为已处理将在源和目标会话上启动本地 JMS 事务。请注意,这是两个独立的本地 JMS 事务,由两个独立的 JmsTransactionManagers 管理。
将路由标记为 'transacted' 将启动 JTA 事务上下文。请注意,PlatformTransactionManager 必须是 JtaTransactionManager。当调用 'to' 组件时,消息发送的本地 JMS 事务将与消息获取的本地事务同步。 (JTA 同步事务)。这意味着当远程代理确认发送的提交时,发送将获得回调。那时,接收到的消息将被提交。这是 'dups OK' 事务行为(不是 XA)。您有一个 window 消息已发送,但接收尚未确认。
实际上让这个工作很棘手。这是一个示例:
<!-- ******************** Camel route definition ********************* -->
<camelContext allowUseOriginalMessage="false"
id="camelContext-Bridge-Local" streamCache="true" trace="true" xmlns="http://camel.apache.org/schema/blueprint">
<route id="amq-to-amq">
<from id="from" uri="amqLoc:queue:IN"/>
<transacted id="trans"/>
<to id="to" uri="amqRem:queue:OUT"/>
</route>
</camelContext>
<!-- ********************* Local AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqLoc">
<property name="configuration">
<bean class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="AmqCFLocalPool"/>
<property name="receiveTimeout" value="100000"/>
<property name="maxConcurrentConsumers" value="3"/>
<property name="cacheLevelName" value="CACHE_NONE"/>
<property name="transacted" value="true"/>
</bean>
</property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory" id="AmqCFLocalPool">
<property name="maxConnections" value="1"/>
<property name="idleTimeout" value="0"/>
<property name="connectionFactory" ref="AmqCFLocal"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFLocal">
<property name="brokerURL" value="tcp://10.0.0.170:61616?jms.prefetchPolicy.all=0"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
<!-- ********************* Remote AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqRem">
<property name="configuration">
<bean class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="AmqCFRemotePool"/>
<property name="transacted" value="true"/>
</bean>
</property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory"
destroy-method="stop" id="AmqCFRemotePool" init-method="start">
<property name="maxConnections" value="1"/>
<property name="idleTimeout" value="0"/>
<property name="connectionFactory" ref="AmqCFRemote"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFRemote">
<property name="brokerURL" value="tcp://10.0.0.171:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
为您正在使用的 JTA 事务管理器启用 org.springframework.jms.connection.JmsTransactionManager 和 DEBUG/TRACE 级别的 DEBUG 日志记录。
After going through Camel In Action book, I encountered following doubts.
I have below 2 routes
A.
from("file:/home/src") //(A.1) .transacted("required") //(A.2) .bean("dbReader", "readFromDB()") //(A.3) only read from DB .bean("dbReader", "readFromDB()") //(A.4) only read from DB .to("jms:queue:DEST_QUEUE") //(A.5)
Questions:
A.a. Is transacted in (A.2) really required here ?A.b. If answer to #a is yes, then what should be the associated transaction manager of the "required" policy ? Should it be JmsTransactionManager or JpaTransactionManager ?
A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?
B.
from("jms:queue:SRC_QUEUE") //(B.1) transactional jms endpoint .transacted("required") //(B.2) .bean("someBean", "someMethod()") //(B.3) simple arithmetic computation .to("jms1:queue:DEST_QUEUE") //(B.4)
SRC_QUEUE and DEST_QUEUE are queues of different jms broker
Questions:
B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?
B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?
关于 Camel 事务处理的非常好的问题。
一般说明:在谈论 Camel 事务时,它意味着使用从具有事务能力的系统(如数据库或 JMS 代理)进行的事务处理。路由 中的 transacted
语句必须紧跟在 from
语句之后,因为它总是 与消耗 有关。
A.a. Is transacted in (A.2) really required here ?
不,不是。由于文件系统不支持事务处理,因此在这条路线上没有任何帮助。
A.b. If answer to #a is yes, then ... ?
没有"filesystem transaction manager"
A.c. As DEST_QUEUE is at the producer end, so does JMS component in (A.5) need to be transacted ?
不确定,但我不这么认为。生产者试图将消息传递给经纪人。 事务用于启用回滚,但是如果代理没有收到数据,回滚可以做什么?
B.a. The JMS component in (B.1) is marked as transacted, so in this case does route need to be transacted as mentioned in (B.2) ?
这取决于因为 SRC 和 DEST 在不同的代理上。
- 如果你想要代理之间的端到端交易,你需要使用XA交易管理器,然后你必须将路由标记为
transacted
。 - 如果您同意 消费者交易,您可以 configure the JMS component for it 并省略 Spring Tx 管理器和 Camel
transacted
语句。
澄清最后一点:如果您使用本地代理事务进行消费,Camel 在路由成功处理之前不会提交消息。因此,如果发生任何错误,将发生回滚并重新传递消息。
在大多数情况下,这完全没问题,但是,两个不同的代理仍然可能发生的情况是路由已成功处理,消息已传递到 DEST 代理,但 Camel 不再能够提交给 SRC 代理。然后发生重新投递,路由被再次处理并且 消息被多次投递到 DEST 代理 。
在我看来,XA 交易的复杂性比本地代理交易的非常罕见的边缘情况更难处理。但这是一个非常主观的意见,可能还取决于您使用的上下文或数据。
重要的是要注意:如果 SRC 和 DEST broker 相同,则本地 broker 交易 100% 足够! 绝对不需要 Spring Tx manager 和 Camel transacted
.
B.b. As DEST_QUEUE is at the producer end, so does JMS component in (B.4) need to be transacted ?
与 B.a 的回答相同。
下午好,
我想花一点时间来回答您的问题。我将解决 'B' 个附带问题。
WRT:
B.a。 (B.1)中的JMS组件被标记为transacted,那么在这种情况下路由是否需要像(B.2)中提到的那样被transacted?
是的。源组件和目标组件都需要标记为已处理。将组件标记为已处理将在源和目标会话上启动本地 JMS 事务。请注意,这是两个独立的本地 JMS 事务,由两个独立的 JmsTransactionManagers 管理。
将路由标记为 'transacted' 将启动 JTA 事务上下文。请注意,PlatformTransactionManager 必须是 JtaTransactionManager。当调用 'to' 组件时,消息发送的本地 JMS 事务将与消息获取的本地事务同步。 (JTA 同步事务)。这意味着当远程代理确认发送的提交时,发送将获得回调。那时,接收到的消息将被提交。这是 'dups OK' 事务行为(不是 XA)。您有一个 window 消息已发送,但接收尚未确认。
实际上让这个工作很棘手。这是一个示例:
<!-- ******************** Camel route definition ********************* -->
<camelContext allowUseOriginalMessage="false"
id="camelContext-Bridge-Local" streamCache="true" trace="true" xmlns="http://camel.apache.org/schema/blueprint">
<route id="amq-to-amq">
<from id="from" uri="amqLoc:queue:IN"/>
<transacted id="trans"/>
<to id="to" uri="amqRem:queue:OUT"/>
</route>
</camelContext>
<!-- ********************* Local AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqLoc">
<property name="configuration">
<bean class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="AmqCFLocalPool"/>
<property name="receiveTimeout" value="100000"/>
<property name="maxConcurrentConsumers" value="3"/>
<property name="cacheLevelName" value="CACHE_NONE"/>
<property name="transacted" value="true"/>
</bean>
</property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory" id="AmqCFLocalPool">
<property name="maxConnections" value="1"/>
<property name="idleTimeout" value="0"/>
<property name="connectionFactory" ref="AmqCFLocal"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFLocal">
<property name="brokerURL" value="tcp://10.0.0.170:61616?jms.prefetchPolicy.all=0"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
<!-- ********************* Remote AMQ configuration ************************** -->
<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqRem">
<property name="configuration">
<bean class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="AmqCFRemotePool"/>
<property name="transacted" value="true"/>
</bean>
</property>
</bean>
<bean class="org.apache.activemq.jms.pool.PooledConnectionFactory"
destroy-method="stop" id="AmqCFRemotePool" init-method="start">
<property name="maxConnections" value="1"/>
<property name="idleTimeout" value="0"/>
<property name="connectionFactory" ref="AmqCFRemote"/>
</bean>
<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFRemote">
<property name="brokerURL" value="tcp://10.0.0.171:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
为您正在使用的 JTA 事务管理器启用 org.springframework.jms.connection.JmsTransactionManager 和 DEBUG/TRACE 级别的 DEBUG 日志记录。