spring-integration-aws动态文件下载

spring-integration-aws dynamic file download

我需要根据邮件内容从 S3 下载文件。换句话说,要下载的文件以前是未知的,我必须在运行时搜索并找到它。 S3StreamingMessageSource 似乎不太合适,因为:

  1. 它依赖于轮询,因为我需要等待消息。
  2. 我找不到任何在流程中动态创建 S3StreamingMessageSource 的方法。 gateway(IntegrationFlow) 看起来很有趣,但我需要的是一个不存在的 gateway(Function<Message<?>, IntegrationFlow>)

另一个候选者是 S3MessageHandler,但它不支持列出我需要找到所需文件的文件。

我可以直接使用 AWS API 实现我自己的消息处理程序,只是想知道我是否遗漏了什么,因为这似乎不是一个不寻常的要求。毕竟,并非每个应用程序都只是坐在那里并不断轮询 S3 以获取新文件。

S3RemoteFileTemplatelist() 功能,您可以在 handle() 中使用。然后 split() 结果并为每个要下载的远程文件调用 S3MessageHandler

虽然最后一个有下载整个远程目录的功能。

对于遇到这个问题的任何人,这就是我所做的。诀窍是:

  1. 稍后设置过滤器,而不是在构建时。请注意,没有 addFiltersgetFilters 方法,因此过滤器只能设置一次,以后无法添加。 @artem-bilan,这很不方便。
  2. 手动调用 S3StreamingMessageSource.receive

    .handle(String.class, (fileName, h) -> {
    if (messageSource instanceof S3StreamingMessageSource) {
        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
    
        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilters(
                new S3SimplePatternFileListFilter("**/*/*.json.gz"),
                new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
                new S3FileListFilter(fileName)
        );
        s3StreamingMessageSource.setFilter(chainFileListFilter);
    
        return s3StreamingMessageSource.receive();
    }
    log.warn("Expected: {} but got: {}.",
            S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
    return messageSource.receive();
    }, spec -> spec
        .requiresReply(false) // in case all messages got filtered out
    )