spring-integration-aws动态文件下载
spring-integration-aws dynamic file download
我需要根据邮件内容从 S3 下载文件。换句话说,要下载的文件以前是未知的,我必须在运行时搜索并找到它。 S3StreamingMessageSource 似乎不太合适,因为:
- 它依赖于轮询,因为我需要等待消息。
- 我找不到任何在流程中动态创建
S3StreamingMessageSource
的方法。 gateway(IntegrationFlow)
看起来很有趣,但我需要的是一个不存在的 gateway(Function<Message<?>, IntegrationFlow>)
。
另一个候选者是 S3MessageHandler,但它不支持列出我需要找到所需文件的文件。
我可以直接使用 AWS API 实现我自己的消息处理程序,只是想知道我是否遗漏了什么,因为这似乎不是一个不寻常的要求。毕竟,并非每个应用程序都只是坐在那里并不断轮询 S3 以获取新文件。
有 S3RemoteFileTemplate
和 list()
功能,您可以在 handle()
中使用。然后 split()
结果并为每个要下载的远程文件调用 S3MessageHandler
。
虽然最后一个有下载整个远程目录的功能。
对于遇到这个问题的任何人,这就是我所做的。诀窍是:
- 稍后设置过滤器,而不是在构建时。请注意,没有
addFilters
或 getFilters
方法,因此过滤器只能设置一次,以后无法添加。 @artem-bilan,这很不方便。
手动调用 S3StreamingMessageSource.receive
。
.handle(String.class, (fileName, h) -> {
if (messageSource instanceof S3StreamingMessageSource) {
S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilters(
new S3SimplePatternFileListFilter("**/*/*.json.gz"),
new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
new S3FileListFilter(fileName)
);
s3StreamingMessageSource.setFilter(chainFileListFilter);
return s3StreamingMessageSource.receive();
}
log.warn("Expected: {} but got: {}.",
S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
return messageSource.receive();
}, spec -> spec
.requiresReply(false) // in case all messages got filtered out
)
我需要根据邮件内容从 S3 下载文件。换句话说,要下载的文件以前是未知的,我必须在运行时搜索并找到它。 S3StreamingMessageSource 似乎不太合适,因为:
- 它依赖于轮询,因为我需要等待消息。
- 我找不到任何在流程中动态创建
S3StreamingMessageSource
的方法。gateway(IntegrationFlow)
看起来很有趣,但我需要的是一个不存在的gateway(Function<Message<?>, IntegrationFlow>)
。
另一个候选者是 S3MessageHandler,但它不支持列出我需要找到所需文件的文件。
我可以直接使用 AWS API 实现我自己的消息处理程序,只是想知道我是否遗漏了什么,因为这似乎不是一个不寻常的要求。毕竟,并非每个应用程序都只是坐在那里并不断轮询 S3 以获取新文件。
有 S3RemoteFileTemplate
和 list()
功能,您可以在 handle()
中使用。然后 split()
结果并为每个要下载的远程文件调用 S3MessageHandler
。
虽然最后一个有下载整个远程目录的功能。
对于遇到这个问题的任何人,这就是我所做的。诀窍是:
- 稍后设置过滤器,而不是在构建时。请注意,没有
addFilters
或getFilters
方法,因此过滤器只能设置一次,以后无法添加。 @artem-bilan,这很不方便。 手动调用
S3StreamingMessageSource.receive
。.handle(String.class, (fileName, h) -> { if (messageSource instanceof S3StreamingMessageSource) { S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource; ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>(); chainFileListFilter.addFilters( new S3SimplePatternFileListFilter("**/*/*.json.gz"), new S3PersistentAcceptOnceFileListFilter(metadataStore, ""), new S3FileListFilter(fileName) ); s3StreamingMessageSource.setFilter(chainFileListFilter); return s3StreamingMessageSource.receive(); } log.warn("Expected: {} but got: {}.", S3StreamingMessageSource.class.getName(), messageSource.getClass().getName()); return messageSource.receive(); }, spec -> spec .requiresReply(false) // in case all messages got filtered out )