同一个流程中的多个事务同步工厂

Multiple transaction synchronization factories in the same flow

我有以下流程:SFTP 入站流适配器读取文件,该文件是 JSON。那么:

这就是流程的精髓:

<int-sftp:inbound-streaming-channel-adapter
  session-factory="mySftpSessionFactory"
  channel="channel1"
  filename-pattern="*.JSON"
  remote-directory-expression="'/mypath'"
  max-fetch-size="10">
  <int:poller max-messages-per-poll="1" fixed-delay="1"
    time-unit="MINUTES" error-channel="unexpectedErrorChannel">
    <int:transactional
      synchronization-factory="synchFactory1"
      transaction-manager="transactionManager" />
    </int:poller>
</int-sftp:inbound-streaming-channel-adapter>

<int:transaction-synchronization-factory id="synchFactory1">
  <int:after-commit channel="onCommitRemoteFileRenameChannel" />
</int:transaction-synchronization-factory>

<int:channel id="channel1" />

<int:chain input-channel="channel1" output-channel="nullChannel">
  <int:stream-transformer charset="UTF-8" />
  <int:json-to-object-transformer type="com.example.MyDto" />
  <int:transformer ref="myTransformer">
    <int:transactional synchronization-factory="synchFactory2"
      transaction-manager="transactionManager" />
  </int:transformer>
</int:chain>

<int:transaction-synchronization-factory id="synchFactory2">
  <int:after-commit channel="onCommitSecondFlowChannel" />
</int:transaction-synchronization-factory>

<int:channel id="onCommitRemoteFileRenameChannel" />
<int-sftp:outbound-gateway
  session-factory="mySftpSessionFactory"
  request-channel="onCommitRemoteFileRenameChannel" 
  command="mv"
  expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))"
  rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))"
  requires-reply="false"
  reply-channel="nullChannel" />

在转换器中声明的第二个 <int:transactional> 并不意味着实际打开一个新事务(默认传播应该是必需的,所以它应该加入轮询器打开的主事务),而只是为了添加第二个同步,将转换器返回的消息发送到名为 onCommitSecondFlowChannel 的通道以供进一步处理。

然而,这似乎无法正常工作:虽然正确执行了远程文件重命名,但没有应用第二次同步,因为我没有从 trasnformer 获得任何消息到 onCommitSecondFlowChannel。 我尝试调试,发现两个 <int:transacationl-synchronization-factory> 都被解析了,但是第二个方法 ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder) 从未执行过。

有没有一种方法可以获得此结果,而无需在转换器之后声明服务激活器和网关以将消息放置在第二个流通道上?

我认为您使用额外的 tx-sync 使您的流程过于复杂。 它实际上并不是为了在流程中间做某事而设计的。 从我们开始交易的那一刻起,它更像是一些全局钩子。因为你不在那个变压器里,所以不能应用。

我们可能会在未来对其进行修改,因为我看到了你的感受,如果此时无法启动新的 TX,这样的 tx-sync 应该加入现有的 TX 是合乎逻辑的。

无论如何我不会那样做,因为它使流程不直观。

我看到你有这个 <int:chain input-channel="channel1" output-channel="nullChannel"> - nullChannel 作为 <transformer>.

之后的输出

那么,如何完全删除 <transactional> 并将 onCommitSecondFlowChannel 作为此 <chain> 的输出?无论如何,这看起来像是您流程的结束,因此无需将某些事情推迟到交易结束。