Spring SFTP 使用出站消息网关和 IntegrationFlow 轮询多个目录
Spring SFTP polling multiple directory using Outbound Message Gateway and IntegrationFlow
我想定期获取特定远程目录下的所有文件。应用程序启动后,我只能在该目录下获取一次文件。不确定为什么轮询器不工作。这是在 spring 引导项目中注册的,版本是 2.2.1
@InboundChannelAdapter(value = "sftpReportChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForGET(){
return "/etl/biq/autoscore/output/report-data/";
}
@Bean
public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
return IntegrationFlows.from("sftpReportChannel")
.handle(Sftp.outboundGateway(csf,
AbstractRemoteFileOutboundGateway.Command.LS, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
//Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "autoscore-meta-data")))
.split()
.log(message -> "file path -> "+message.getPayload())
.handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/etl/biq/autoscore/output/report-data/' + payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM))
.handle(new ReportHandler()) //get the payload and create email content and send eamil to recipients
.get();
}
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "autoscore-meta-data")))
使其工作方式不会在后续轮询活动中一次又一次地选取相同的文件。
确保在运行时在该远程目录中添加新文件或修改已处理的文件。 SftpPersistentAcceptOnceFileListFilter
逻辑依赖于 LsEntry
的 mtime
属性 来确定文件已被更改,因此可以再次处理。
我想定期获取特定远程目录下的所有文件。应用程序启动后,我只能在该目录下获取一次文件。不确定为什么轮询器不工作。这是在 spring 引导项目中注册的,版本是 2.2.1
@InboundChannelAdapter(value = "sftpReportChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForGET(){
return "/etl/biq/autoscore/output/report-data/";
}
@Bean
public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
return IntegrationFlows.from("sftpReportChannel")
.handle(Sftp.outboundGateway(csf,
AbstractRemoteFileOutboundGateway.Command.LS, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
//Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "autoscore-meta-data")))
.split()
.log(message -> "file path -> "+message.getPayload())
.handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/etl/biq/autoscore/output/report-data/' + payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM))
.handle(new ReportHandler()) //get the payload and create email content and send eamil to recipients
.get();
}
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "autoscore-meta-data")))
使其工作方式不会在后续轮询活动中一次又一次地选取相同的文件。
确保在运行时在该远程目录中添加新文件或修改已处理的文件。 SftpPersistentAcceptOnceFileListFilter
逻辑依赖于 LsEntry
的 mtime
属性 来确定文件已被更改,因此可以再次处理。