Java 配置入站文件适配器事务管理
Java Config Inbound File Adapter Transaction Management
我有一个简单的 IntegrationFlow
,我正试图通过事务管理来保护它。从高层次上看,它会轮询目录中的文件,将该文件 gzip 到磁盘上的新位置,然后将第二个文件上传到 S3。如果提交,我想将两条路径发布到一个频道(它们将被删除);如果它回滚,我想将它们发布到不同的频道(它们将被存档)。
我的(缩写)尝试配置如下所示:
@Bean
public IntegrationFlow fromFileFlow(@Qualifier("inboundMessageDirectory") Path inboundMessageDirectory,
@Qualifier("consumableFileFilter") FileListFilter<File> fileListFilter,
@Qualifier(ChannelNames.TO_GZIP_SERVICE) MessageChannel outbound,
@Qualifier("transactionSynchronizationFactory") TransactionSynchronizationFactory transactionSynchronizationFactory) {
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile()),
e -> e.poller(Pollers
.fixedDelay(100)
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.transactional(new PseudoTransactionManager())
.get()))
.channel(outbound)
.get();
}
服务看起来像:
@ServiceActivator(inputChannel = ChannelNames.TO_GZIP_SERVICE, outputChannel = ChannelNames.TO_S3_SERVICE)
public Path gzip(@Payload Path path, @Header(ApplicationHeaders.REFERENCE_ID) String referenceId, @Header(ApplicationHeaders.DATA_TYPE) String dataType,
@Header(ApplicationHeaders.BUCKET_END) long bucketEnd) throws IOException {
// ...
}
@ServiceActivator(inputChannel = ChannelNames.TO_S3_SERVICE)
@Retryable(interceptor = RETRY_INTERCEPTOR_BEAN_NAME)
public void sendToS3(@Payload Path path, @Header(ApplicationHeaders.BUCKET_END) long bucketStart,
@Header(ApplicationHeaders.DATA_TYPE) String dataType) throws IOException {
// ...
}
我的自定义 TransactionSynchronizationProcessor
(我使用 DefaultTransactionSynchronizationFactory
)大致如下实现(未显示的代码从消息有效负载中提取路径并将其存储在 IntegrationResourceHolder 的属性中):
@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
updatePaths(holder);
}
@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
updateAndSend(successChannel, holder);
}
@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
updateAndSend(failureChannel, holder);
}
我的理解是,因为所有的间隙渠道都是直接渠道,交易也应该包括服务。因为它会在提交之前、提交之后和回滚之后进行更新,所以我希望它在开始时访问消息,获取解压后的路径,然后在最后访问它并获得压缩后的路径——然后尝试采取行动他们。但是,只有解压缩的路径会被提取。
显然,我对交易的应用(和理解)缺少一些东西。我实现所需行为的正确方法是什么?
不对,你有点误会了。
有了 transactional
和 transactionSynchronizationFactory
,我们只担心 source
。作为数据的最终结果对于事务资源并不重要。我们只需要知道提交或回滚事务的状态。
这就是为什么IntegrationResourceHolder
有一个message
属性要担心TX端携带源数据。
当我们回滚时,尝试在 TX 的末尾成像一个 DB 事务和数据。对,没有那个数据!只有TX的输入重要。
但是,您可以通过额外的 ResourceHolder
和 ResourceHolderSynchronization
来满足您的要求,以便在 S3 存储之前的某个地方使用。简单的 ThreadLocal
也可能有帮助。但一定要在TX结束时正确清除。
从另一边考虑使用来自 s.file
的消息的 headers
携带您需要的一切。为此,AbstractMessageSourceAdvice
会很有用,因为 Spring 集成 4.2.
我有一个简单的 IntegrationFlow
,我正试图通过事务管理来保护它。从高层次上看,它会轮询目录中的文件,将该文件 gzip 到磁盘上的新位置,然后将第二个文件上传到 S3。如果提交,我想将两条路径发布到一个频道(它们将被删除);如果它回滚,我想将它们发布到不同的频道(它们将被存档)。
我的(缩写)尝试配置如下所示:
@Bean
public IntegrationFlow fromFileFlow(@Qualifier("inboundMessageDirectory") Path inboundMessageDirectory,
@Qualifier("consumableFileFilter") FileListFilter<File> fileListFilter,
@Qualifier(ChannelNames.TO_GZIP_SERVICE) MessageChannel outbound,
@Qualifier("transactionSynchronizationFactory") TransactionSynchronizationFactory transactionSynchronizationFactory) {
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile()),
e -> e.poller(Pollers
.fixedDelay(100)
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.transactional(new PseudoTransactionManager())
.get()))
.channel(outbound)
.get();
}
服务看起来像:
@ServiceActivator(inputChannel = ChannelNames.TO_GZIP_SERVICE, outputChannel = ChannelNames.TO_S3_SERVICE)
public Path gzip(@Payload Path path, @Header(ApplicationHeaders.REFERENCE_ID) String referenceId, @Header(ApplicationHeaders.DATA_TYPE) String dataType,
@Header(ApplicationHeaders.BUCKET_END) long bucketEnd) throws IOException {
// ...
}
@ServiceActivator(inputChannel = ChannelNames.TO_S3_SERVICE)
@Retryable(interceptor = RETRY_INTERCEPTOR_BEAN_NAME)
public void sendToS3(@Payload Path path, @Header(ApplicationHeaders.BUCKET_END) long bucketStart,
@Header(ApplicationHeaders.DATA_TYPE) String dataType) throws IOException {
// ...
}
我的自定义 TransactionSynchronizationProcessor
(我使用 DefaultTransactionSynchronizationFactory
)大致如下实现(未显示的代码从消息有效负载中提取路径并将其存储在 IntegrationResourceHolder 的属性中):
@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
updatePaths(holder);
}
@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
updateAndSend(successChannel, holder);
}
@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
updateAndSend(failureChannel, holder);
}
我的理解是,因为所有的间隙渠道都是直接渠道,交易也应该包括服务。因为它会在提交之前、提交之后和回滚之后进行更新,所以我希望它在开始时访问消息,获取解压后的路径,然后在最后访问它并获得压缩后的路径——然后尝试采取行动他们。但是,只有解压缩的路径会被提取。
显然,我对交易的应用(和理解)缺少一些东西。我实现所需行为的正确方法是什么?
不对,你有点误会了。
有了 transactional
和 transactionSynchronizationFactory
,我们只担心 source
。作为数据的最终结果对于事务资源并不重要。我们只需要知道提交或回滚事务的状态。
这就是为什么IntegrationResourceHolder
有一个message
属性要担心TX端携带源数据。
当我们回滚时,尝试在 TX 的末尾成像一个 DB 事务和数据。对,没有那个数据!只有TX的输入重要。
但是,您可以通过额外的 ResourceHolder
和 ResourceHolderSynchronization
来满足您的要求,以便在 S3 存储之前的某个地方使用。简单的 ThreadLocal
也可能有帮助。但一定要在TX结束时正确清除。
从另一边考虑使用来自 s.file
的消息的 headers
携带您需要的一切。为此,AbstractMessageSourceAdvice
会很有用,因为 Spring 集成 4.2.