使用 Spring 集成 SFTP DSL 递归读取文件
Recursively read files with Spring Integration SFTP DSL
我正在尝试 Spring 集成 SFTP 从远程服务器的所有子文件夹中递归读取文件 (.txt)。远程文件夹类似于“/tmp/remoteFolder”,所有子文件夹都是日期文件夹,如“/tmp/remoteFolder/20180830”、“/tmp/remoteFolder/20170902”。
这是我到目前为止的代码
@Bean
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "'/tmp/remoteDirectory/*'")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\d{8})|*\.txt)")
.localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
.channel(remoteFileOutputChannel())
.get();
}
@Bean
public MessageChannel sftpMgetInboundChannel(){
return new DirectChannel();
}
@Bean
public PollableChannel remoteFileOutputChannel() {
return new QueueChannel();
}
如何将 sftp mget 的根远程目录指定为 /tmp/remoteFolder?为什么这不起作用?为什么我需要指定输出通道?
更新:我没有调用 channel(remoteFileOutputChannel()),而是像这样调用处理程序
@Bean
public MessageHandler messageHandler(){
return new MessageHandler() { ... }
}
代码已更新:
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForMGET(){
return "'/tmp/input/remoteDirectory/*'";
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "payload")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\d{8})|*\.txt)")
.localDirectoryExpression("'sftp-inbound/'" + "#remoteDirectory"))
.handler(messageHandler())
.get();
}
@Bean
public MessageChannel sftpMgetInboundChannel(){
return new DirectChannel();
}
@Bean
public MessageHandler messageHandler(){
return new MessageHandler() { ... }
}
使用此更新后的代码,我收到以下错误:
rg.springframework.core.NestedIOException: failed to read file; nested exception is 2: No such file
at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:100)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.read(CachingSessionFactory.java:137)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:176)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:138)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:144)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:207)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: 2: No such file
at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2289)
at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1741)
at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1011)
at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:986)
at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:96)
... 22 more
将表达式设置为 payload
(就像编辑前您的问题中的情况一样),发送到网关的消息负载应该是 /tmp/remoteFolder/*
,它在内部分为远程目录和远程文件名 (*
).
Why do I need to specifiy the output channel?
MGET(检索到的文件列表)的结果需要去某个地方。
编辑
你误会了;您不能将 @InboundChannelAdapter
注释添加到流中;你需要这样的东西...
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForMGET() {
return "/tmp/remoteDirectory/";
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "payload")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\d{8})|*\.txt)")
.localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
.channel(remoteFileOutputChannel())
.get();
}
我正在尝试 Spring 集成 SFTP 从远程服务器的所有子文件夹中递归读取文件 (.txt)。远程文件夹类似于“/tmp/remoteFolder”,所有子文件夹都是日期文件夹,如“/tmp/remoteFolder/20180830”、“/tmp/remoteFolder/20170902”。
这是我到目前为止的代码
@Bean
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "'/tmp/remoteDirectory/*'")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\d{8})|*\.txt)")
.localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
.channel(remoteFileOutputChannel())
.get();
}
@Bean
public MessageChannel sftpMgetInboundChannel(){
return new DirectChannel();
}
@Bean
public PollableChannel remoteFileOutputChannel() {
return new QueueChannel();
}
如何将 sftp mget 的根远程目录指定为 /tmp/remoteFolder?为什么这不起作用?为什么我需要指定输出通道?
更新:我没有调用 channel(remoteFileOutputChannel()),而是像这样调用处理程序
@Bean
public MessageHandler messageHandler(){
return new MessageHandler() { ... }
}
代码已更新:
@InboundChannelAdapter(value = "sftpMgetInputChannel", poller = @Poller(fixedDelay = "5000")) public String filesForMGET(){ return "'/tmp/input/remoteDirectory/*'"; } @Bean public IntegrationFlow sftpMGetFlow() { return IntegrationFlows.from("sftpMgetInputChannel") .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory, Command.MGET, "payload") .options(Option.RECURSIVE) .regexFileNameFilter("((\d{8})|*\.txt)") .localDirectoryExpression("'sftp-inbound/'" + "#remoteDirectory")) .handler(messageHandler()) .get(); } @Bean public MessageChannel sftpMgetInboundChannel(){ return new DirectChannel(); } @Bean public MessageHandler messageHandler(){ return new MessageHandler() { ... } }
使用此更新后的代码,我收到以下错误:
rg.springframework.core.NestedIOException: failed to read file; nested exception is 2: No such file at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:100) at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.read(CachingSessionFactory.java:137) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:176) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:138) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89) at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146) at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:144) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:207) at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680) Caused by: 2: No such file at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2289) at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1741) at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1011) at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:986) at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:96) ... 22 more
将表达式设置为 payload
(就像编辑前您的问题中的情况一样),发送到网关的消息负载应该是 /tmp/remoteFolder/*
,它在内部分为远程目录和远程文件名 (*
).
Why do I need to specifiy the output channel?
MGET(检索到的文件列表)的结果需要去某个地方。
编辑
你误会了;您不能将 @InboundChannelAdapter
注释添加到流中;你需要这样的东西...
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForMGET() {
return "/tmp/remoteDirectory/";
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "payload")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\d{8})|*\.txt)")
.localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
.channel(remoteFileOutputChannel())
.get();
}