Spring 集成 - 来自 sftp 入站的聚合器
Spring integration - aggregator from sftp inbound
从包含多个文件的 sftp 入站消息源聚合一条消息的最佳解决方案是什么?
我们在远程机器上有 3 个文件需要接收。之后我们将这些文件的内容组合成一条 json 消息并转发。
public IntegrationFlow sftpIntegrationFlowBean() {
final Map<String, Object> headers = new HashMap<>();
headers.put("sftpFile", "sftpFile");
final Consumer<AggregatorSpec> aggregator = t -> {
t.sendPartialResultOnExpiry(true);
t.expireGroupsUponCompletion(true);
t.processor(new CustomMessageAggregator());
};
return IntegrationFlows
.from(sftpInboundMessageSource(),
e -> e.id("sftpIntegrationFlow").poller(pollerMetadataSftp))
.enrichHeaders(headers).aggregate(aggregator)
.handle(customMessageSender).get();
}
轮询器每 15 分钟轮询一次。
当 运行 这段代码接下来会发生:
- 检索文件并处理其中一个文件
- 15 分钟后处理第二个文件
- 又过了 15 分钟,处理了第三个文件
- 最后在 15 分钟后消息被发送到目的地
这一切如何在一次操作中完成而没有延迟?我确实使用 FileReadingMessageSource 进行了尝试,但结果相同。
提前谢谢你。
增加 maxMessagesPerPoll
在 PollerMetadata
.
从包含多个文件的 sftp 入站消息源聚合一条消息的最佳解决方案是什么? 我们在远程机器上有 3 个文件需要接收。之后我们将这些文件的内容组合成一条 json 消息并转发。
public IntegrationFlow sftpIntegrationFlowBean() {
final Map<String, Object> headers = new HashMap<>();
headers.put("sftpFile", "sftpFile");
final Consumer<AggregatorSpec> aggregator = t -> {
t.sendPartialResultOnExpiry(true);
t.expireGroupsUponCompletion(true);
t.processor(new CustomMessageAggregator());
};
return IntegrationFlows
.from(sftpInboundMessageSource(),
e -> e.id("sftpIntegrationFlow").poller(pollerMetadataSftp))
.enrichHeaders(headers).aggregate(aggregator)
.handle(customMessageSender).get();
}
轮询器每 15 分钟轮询一次。 当 运行 这段代码接下来会发生:
- 检索文件并处理其中一个文件
- 15 分钟后处理第二个文件
- 又过了 15 分钟,处理了第三个文件
- 最后在 15 分钟后消息被发送到目的地
这一切如何在一次操作中完成而没有延迟?我确实使用 FileReadingMessageSource 进行了尝试,但结果相同。
提前谢谢你。
增加 maxMessagesPerPoll
在 PollerMetadata
.