Spring 集成 - 来自 sftp 入站的聚合器

Spring integration - aggregator from sftp inbound

从包含多个文件的 sftp 入站消息源聚合一条消息的最佳解决方案是什么? 我们在远程机器上有 3 个文件需要接收。之后我们将这些文件的内容组合成一条 json 消息并转发。

public IntegrationFlow sftpIntegrationFlowBean() {
    final Map<String, Object> headers = new HashMap<>();
    headers.put("sftpFile", "sftpFile");
    final Consumer<AggregatorSpec> aggregator = t -> {
        t.sendPartialResultOnExpiry(true);
        t.expireGroupsUponCompletion(true);
        t.processor(new CustomMessageAggregator());
    };
    return IntegrationFlows
            .from(sftpInboundMessageSource(),
                    e -> e.id("sftpIntegrationFlow").poller(pollerMetadataSftp))
            .enrichHeaders(headers).aggregate(aggregator)
            .handle(customMessageSender).get();
}

轮询器每 15 分钟轮询一次。 当 运行 这段代码接下来会发生:

  1. 检索文件并处理其中一个文件
  2. 15 分钟后处理第二个文件
  3. 又过了 15 分钟,处理了第三个文件
  4. 最后在 15 分钟后消息被发送到目的地

这一切如何在一次操作中完成而没有延迟?我确实使用 FileReadingMessageSource 进行了尝试,但结果相同。

提前谢谢你。

增加 maxMessagesPerPollPollerMetadata.