异步通道(执行器通道)的事务支持

Transaction support with asyc channel( executor channel)

我从一个Q中读取数据并传递给执行者通道进行处理。不,主线程的事务范围将不起作用。我怎样才能从下游有多个直接渠道的执行者渠道创建交易。 这是配置

Read data from some Queue ( gateway)
<int:channel id="mainChannel">
        <int:interceptors>
            <int:wire-tap channel="channel1"/>
            <int:wire-tap channel="channel2"/>
            <int:wire-tap channel="channel3"/>
      
</int:interceptors>
    </int:channel> 
   <int:channel id="channel1">
        <int:dispatcher task-executor="exec1" />
   </int:channel>
  <int:channel id="channel2">
        <int:dispatcher task-executor="exec2" />
  </int:channel>
   <int:channel id="channel3">
        <int:dispatcher task-executor="exec3" />
  </int:channel>

 <int:chain id="id" input-channel="Channel1">

        
        <int:header-value-router header-name="headerName">
            <int:mapping value="Header1" channel="Channel4"/>
            <int:mapping value="Header2" channel="Channel5"/>
            <int:mapping value="Header3" channel="Channel6"/>
            <int:mapping value="Header4" channel="Channel7"/>
            <int:mapping value="Header5" channel="Channel8"/>
            <int:mapping value="Header6" channel="Channel9"/>
        </int:header-value-router>
    
    </int:chain>

那么Channel4-Channel9(都是直通道)是根据一些业务逻辑把数据存入DB的。 要求是在单个事务中包装从通道 1 到通道 4-通道 9 的流。我试图在网上找到但什么也找不到。

编辑 这是我现在正在做的

Read data from some Queue ( gateway)
<int:channel id="mainChannel">
        <int:interceptors>
            <int:wire-tap channel="channel1"/>
            <int:wire-tap channel="channel2"/>
            <int:wire-tap channel="channel3"/>      
</int:interceptors>
    </int:channel> 
   <int:channel id="channel1">
        <int:dispatcher task-executor="exec1" />
   </int:channel>
  <int:channel id="channel2">
        <int:dispatcher task-executor="exec2" />
  </int:channel>
   <int:channel id="channel3">
        <int:dispatcher task-executor="exec3" />
  </int:channel>

 <int:chain id="id" input-channel="Channel1">
   <int:service-activator ref="someRef" method="name1">
        <ref bean="txAdvice" />
    </int:service-activator>    
        <int:header-value-router header-name="headerName">
            <int:mapping value="Header1" channel="Channel4"/>
            <int:mapping value="Header2" channel="Channel5"/>
            <int:mapping value="Header3" channel="Channel6"/>
            <int:mapping value="Header4" channel="Channel7"/>
            <int:mapping value="Header5" channel="Channel8"/>
            <int:mapping value="Header6" channel="Channel9"/>
        </int:header-value-router>
           <bean id="transactionManager"  
 class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>


    <tx:advice id="txAdvice" >
        <tx:attributes>
            <tx:method name="*"/>
        </tx:attributes>
    </tx:advice>
    </int:chain>

Channel4,Channel5 ...Channel9 它们都插入到数据库中。如果应该回滚任何插入失败事务,我想要什么。 如果其中一个数据库插入失败,它仍然不会回滚事务。

编辑 3

<tx:advice id="txAdvice" >
        <tx:attributes>
            <tx:method name="send" propagation="REQUIRES_NEW"/>
        </tx:attributes>
 </tx:advice>
<aop:config>
        <aop:advisor advice-ref="txAdvice" pointcut="bean(Channel1)"/></aop:config>

编辑 4 [已解决的配置]

<int:channel id="mainChannel">
        <int:interceptors>
            <int:wire-tap channel="channel1"/>
            <int:wire-tap channel="channel2"/>
            <int:wire-tap channel="channel3"/>
</int:interceptors>

    <int:service-activator ref="gatewayID" method = "sendToDB"   input-channel="channel1"/>

    <int:gateway id="gatewayID" service- 
        interface="com.*.*.*.TransactionalGateway"  error- 
         channel="errorChannel" default-request-channel="OutChannel"/>

<int:chain id="id" input-channel="OutChannel">
        <int:header-value-router header-name="headerName">
            <int:mapping value="Header1" channel="Channel4"/>
            <int:mapping value="Header2" channel="Channel5"/>
            <int:mapping value="Header3" channel="Channel6"/>
            <int:mapping value="Header4" channel="Channel7"/>
            <int:mapping value="Header5" channel="Channel8"/>
            <int:mapping value="Header6" channel="Channel9"/>
        </int:header-value-router>

    </int:chain>

public interface TransactionalGateway {

    @Transactional
    void sendToDB(Message<?> m);

错误

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#10': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#2' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#2': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:359)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:108)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1522)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1269)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:551)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getObject(AbstractBeanFactory.java:312)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:738)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
    at com.citi.loads.framework.StartLoadsApp.lambda$loadContexts(StartLoadsApp.java:133)
    at java.util.HashMap$KeySet.forEach(HashMap.java:933)
    at com.citi.loads.framework.StartLoadsApp.loadContexts(StartLoadsApp.java:129)
    at com.citi.loads.framework.StartLoadsApp.runWithFramework(StartLoadsApp.java:109)
    at com.citi.loads.framework.StartLoadsApp.main(StartLoadsApp.java:33)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#2': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:185)
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1646)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
    ... 18 more
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
    at org.springframework.util.Assert.isNull(Assert.java:113)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:499)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:226)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:149)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:144)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60)
    at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
    at org.springframework.integration.config.ServiceActivatorFactoryBean.createMethodInvokingHandler(ServiceActivatorFactoryBean.java:57)
    at org.springframework.integration.config.AbstractStandardMessageHandlerFactoryBean.createHandler(AbstractStandardMessageHandlerFactoryBean.java:117)
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:184)
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:172)
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:57)
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178)
    ... 23 more

事务只绑定到一个线程边界,所以你不能启动一个事务向那个执行者通道发送消息并期望它扩展到另一边那个通道的订阅者,这是去确实是一个不同的话题。

但是,如果您的观点略有不同(从您的问题中不清楚),并且您确实希望在使用消息并将其发送给其中一个或多个消息时使用您的 chain 开始交易路由器中的通道,则可以通过 <int:request-handler-advice-chain>TransactionHandleMessageAdvice.

的嵌套 <bean> 配置

在文档中查看有关交易的更多信息:https://docs.spring.io/spring-integration/reference/html/transactions.html#transactions

另请参阅我的旧答案:Keep transaction within Spring Integration flow