如何使用 Spring 集成处理大型 zip 文件的副本
How to handle copies of large zip files with Spring Integration
此主题类似于:Spring integration problem when reading file from ftp
但是没有明确的答案,这里的场景也不完全一样。
我在服务器上有一个输入目录,zip 文件从另一个远程服务器复制到该目录中。基本上文件是由外部文件同步工具复制的(在服务器 A 上的源目录到服务器 B 上的目标目录之间),但也可以通过 scp 或 ftp 工具复制以进行测试。
zip 文件由 Spring Integration Zip 模块以这种方式处理:
@Bean
public IntegrationFlow zipFilePollingFlow(
@Value("file:///${zip.inputDir}") File in,
UnZipTransformer unzipTransformer,
UnZipResultSplitter unZipResultSplitter,
TransactionManager pseudoTransactionManager) {
FileInboundChannelAdapterSpec inbound = Files.inboundAdapter(in).autoCreateDirectory(true);
return IntegrationFlows.from(inbound, pm -> pm.poller(p -> p.fixedDelay(5000)
.transactionSynchronizationFactory(zipIntegrationTxSynchronizationFactory())
.transactional(pseudoTransactionManager))) // Zip process is done transactionnaly
.handle(fileCleanService) // Deletes files in another directory
.transform(unzipTransformer)
.split(unZipResultSplitter)
.<File, Boolean>route( ... ) // the rest of the flow
.get();
}
小 zip 文件一切都很好。但是,当大文件为 copied/transferred(数百 MB)时,我们会陷入此错误:
o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: /.../temp/extract/zip/full-extract-rev.zip
Caused by: org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: ....
我们如何使用 Spring 集成正确处理大型 zip 文件? Zip 文件转换器是否可以重试该过程,直到源 zip 文件传输完成?
另一种方法是将 FileInboundChannelAdapterSpec
配置为 LastModifiedFileListFilter
:
* The {@link FileListFilter} implementation to filter those files which
* {@link File#lastModified()} is less than the {@link #age} in comparison
* with the current time.
所以,只有当文件足够大时,它才会让文件通过。
您还需要将此过滤器与 AcceptOnceFileListFilter
组合成 ChainFileListFilter
以避免重复读取同一个文件。
不确定是否有一个选项可以配置文件系统上的目录以只允许对文件进行独占访问(嗯,默认情况下它在 Windows 上),这样我们就可以编写自定义过滤器并尝试打开文件。如果我们失败了,我们不会将文件路径下移到 ChainFileListFilter
中的 AcceptOnceFileListFilter
:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-reading
此主题类似于:Spring integration problem when reading file from ftp 但是没有明确的答案,这里的场景也不完全一样。
我在服务器上有一个输入目录,zip 文件从另一个远程服务器复制到该目录中。基本上文件是由外部文件同步工具复制的(在服务器 A 上的源目录到服务器 B 上的目标目录之间),但也可以通过 scp 或 ftp 工具复制以进行测试。
zip 文件由 Spring Integration Zip 模块以这种方式处理:
@Bean
public IntegrationFlow zipFilePollingFlow(
@Value("file:///${zip.inputDir}") File in,
UnZipTransformer unzipTransformer,
UnZipResultSplitter unZipResultSplitter,
TransactionManager pseudoTransactionManager) {
FileInboundChannelAdapterSpec inbound = Files.inboundAdapter(in).autoCreateDirectory(true);
return IntegrationFlows.from(inbound, pm -> pm.poller(p -> p.fixedDelay(5000)
.transactionSynchronizationFactory(zipIntegrationTxSynchronizationFactory())
.transactional(pseudoTransactionManager))) // Zip process is done transactionnaly
.handle(fileCleanService) // Deletes files in another directory
.transform(unzipTransformer)
.split(unZipResultSplitter)
.<File, Boolean>route( ... ) // the rest of the flow
.get();
}
小 zip 文件一切都很好。但是,当大文件为 copied/transferred(数百 MB)时,我们会陷入此错误:
o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: /.../temp/extract/zip/full-extract-rev.zip
Caused by: org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.lang.IllegalStateException: Not a zip file: ....
我们如何使用 Spring 集成正确处理大型 zip 文件? Zip 文件转换器是否可以重试该过程,直到源 zip 文件传输完成?
另一种方法是将 FileInboundChannelAdapterSpec
配置为 LastModifiedFileListFilter
:
* The {@link FileListFilter} implementation to filter those files which
* {@link File#lastModified()} is less than the {@link #age} in comparison
* with the current time.
所以,只有当文件足够大时,它才会让文件通过。
您还需要将此过滤器与 AcceptOnceFileListFilter
组合成 ChainFileListFilter
以避免重复读取同一个文件。
不确定是否有一个选项可以配置文件系统上的目录以只允许对文件进行独占访问(嗯,默认情况下它在 Windows 上),这样我们就可以编写自定义过滤器并尝试打开文件。如果我们失败了,我们不会将文件路径下移到 ChainFileListFilter
中的 AcceptOnceFileListFilter
:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-reading