Java 配置入站文件适配器事务管理

Java Config Inbound File Adapter Transaction Management

我有一个简单的 IntegrationFlow,我正试图通过事务管理来保护它。从高层次上看,它会轮询目录中的文件,将该文件 gzip 到磁盘上的新位置,然后将第二个文件上传到 S3。如果提交,我想将两条路径发布到一个频道(它们将被删除);如果它回滚,我想将它们发布到不同的频道(它们将被存档)。

我的(缩写)尝试配置如下所示:

@Bean
public IntegrationFlow fromFileFlow(@Qualifier("inboundMessageDirectory") Path inboundMessageDirectory,
                                    @Qualifier("consumableFileFilter") FileListFilter<File> fileListFilter,
                                    @Qualifier(ChannelNames.TO_GZIP_SERVICE) MessageChannel outbound,
                                    @Qualifier("transactionSynchronizationFactory") TransactionSynchronizationFactory transactionSynchronizationFactory) {
    return IntegrationFlows
            .from(s -> s.file(inboundMessageDirectory.toFile()),
                    e -> e.poller(Pollers
                            .fixedDelay(100)
                            .transactionSynchronizationFactory(transactionSynchronizationFactory)
                            .transactional(new PseudoTransactionManager())
                            .get()))
            .channel(outbound)
            .get();
}

服务看起来像:

@ServiceActivator(inputChannel = ChannelNames.TO_GZIP_SERVICE, outputChannel = ChannelNames.TO_S3_SERVICE)
public Path gzip(@Payload Path path, @Header(ApplicationHeaders.REFERENCE_ID) String referenceId, @Header(ApplicationHeaders.DATA_TYPE) String dataType,
                 @Header(ApplicationHeaders.BUCKET_END) long bucketEnd) throws IOException {
     // ...
}

@ServiceActivator(inputChannel = ChannelNames.TO_S3_SERVICE)
@Retryable(interceptor = RETRY_INTERCEPTOR_BEAN_NAME)
public void sendToS3(@Payload Path path, @Header(ApplicationHeaders.BUCKET_END) long bucketStart,
                     @Header(ApplicationHeaders.DATA_TYPE) String dataType) throws IOException {
     // ...
}

我的自定义 TransactionSynchronizationProcessor(我使用 DefaultTransactionSynchronizationFactory)大致如下实现(未显示的代码从消息有效负载中提取路径并将其存储在 IntegrationResourceHolder 的属性中):

@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
    updatePaths(holder);
}

@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
    updateAndSend(successChannel, holder);
}

@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
    updateAndSend(failureChannel, holder);
}

我的理解是,因为所有的间隙渠道都是直接渠道,交易也应该包括服务。因为它会在提交之前、提交之后和回滚之后进行更新,所以我希望它在开始时访问消息,获取解压后的路径,然后在最后访问它并获得压缩后的路径——然后尝试采取行动他们。但是,只有解压缩的路径会被提取。

显然,我对交易的应用(和理解)缺少一些东西。我实现所需行为的正确方法是什么?

不对,你有点误会了。

有了 transactionaltransactionSynchronizationFactory,我们只担心 source。作为数据的最终结果对于事务资源并不重要。我们只需要知道提交或回滚事务的状态。

这就是为什么IntegrationResourceHolder有一个message 属性要担心TX端携带源数据。

当我们回滚时,尝试在 TX 的末尾成像一个 DB 事务和数据。对,没有那个数据!只有TX的输入重要。

但是,您可以通过额外的 ResourceHolderResourceHolderSynchronization 来满足您的要求,以便在 S3 存储之前的某个地方使用。简单的 ThreadLocal 也可能有帮助。但一定要在TX结束时正确清除。

从另一边考虑使用来自 s.file 的消息的 headers 携带您需要的一切。为此,AbstractMessageSourceAdvice 会很有用,因为 Spring 集成 4.2.