Spring Cloud App Starter,sftp 源,递归文件目录
Spring Cloud App Starter, sftp source, recurse a directory for files
我在 Spring 云数据流中使用 SFTP 源,它正在努力获取在 sftp:remote-dir:/home/someone/source 中定义的文件,现在我在远程目录下有很多子文件夹-dir 并且我想递归地获取此目录下与 patten 匹配的所有文件。我正在尝试使用 filename-regex: 但到目前为止它只适用于一个级别。我如何递归地获取我需要的文件。
入站通道适配器不支持递归;使用带有递归 (-R
) 的 outbound gateway 和 MGET 命令的自定义源。
文档缺少该选项;固定在 current docs.
我打开了 an issue 来创建一个标准的应用启动器。
编辑
使用 Java DSL...
@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {
public static void main(String[] args) {
SpringApplication.run(So44710754Application.class, args);
}
// should store in Redis or similar for persistence
private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(source(), e -> e.poller(Pollers.fixedDelay(30_000)))
.handle(gateway())
.split()
.<File>filter(p -> this.processed.putIfAbsent(p.getAbsolutePath(), true) == null)
.transform(Transformers.fileToByteArray())
.channel(Source.OUTPUT)
.get();
}
private MessageSource<String> source() {
return () -> new GenericMessage<>("foo/*");
}
private AbstractRemoteFileOutboundGateway<LsEntry> gateway() {
AbstractRemoteFileOutboundGateway<LsEntry> gateway = Sftp.outboundGateway(sessionFactory(), "mget", "payload")
.localDirectory(new File("/tmp/foo"))
.options(Option.RECURSIVE)
.get();
gateway.setFileExistsMode(FileExistsMode.IGNORE);
return gateway;
}
private SessionFactory<LsEntry> sessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUser("ftptest");
sf.setPassword("ftptest");
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(sf);
}
}
并使用 Java 配置...
@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {
public static void main(String[] args) {
SpringApplication.run(So44710754Application.class, args);
}
@InboundChannelAdapter(channel = "sftpGate", poller = @Poller(fixedDelay = "30000"))
public String remoteDir() {
return "foo/*";
}
@Bean
@ServiceActivator(inputChannel = "sftpGate")
public SftpOutboundGateway mgetGate() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sessionFactory(), "mget", "payload");
sftpOutboundGateway.setOutputChannelName("splitterChannel");
sftpOutboundGateway.setFileExistsMode(FileExistsMode.IGNORE);
sftpOutboundGateway.setLocalDirectory(new File("/tmp/foo"));
sftpOutboundGateway.setOptions("-R");
return sftpOutboundGateway;
}
@Bean
@Splitter(inputChannel = "splitterChannel")
public DefaultMessageSplitter splitter() {
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannelName("filterChannel");
return splitter;
}
// should store in Redis, Zookeeper, or similar for persistence
private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
@Filter(inputChannel = "filterChannel", outputChannel = "toBytesChannel")
public boolean filter(File payload) {
return this.processed.putIfAbsent(payload.getAbsolutePath(), true) == null;
}
@Bean
@Transformer(inputChannel = "toBytesChannel", outputChannel = Source.OUTPUT)
public FileToByteArrayTransformer toBytes() {
FileToByteArrayTransformer transformer = new FileToByteArrayTransformer();
return transformer;
}
private SessionFactory<LsEntry> sessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUser("ftptest");
sf.setPassword("ftptest");
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(sf);
}
}
我在 Spring 云数据流中使用 SFTP 源,它正在努力获取在 sftp:remote-dir:/home/someone/source 中定义的文件,现在我在远程目录下有很多子文件夹-dir 并且我想递归地获取此目录下与 patten 匹配的所有文件。我正在尝试使用 filename-regex: 但到目前为止它只适用于一个级别。我如何递归地获取我需要的文件。
入站通道适配器不支持递归;使用带有递归 (-R
) 的 outbound gateway 和 MGET 命令的自定义源。
文档缺少该选项;固定在 current docs.
我打开了 an issue 来创建一个标准的应用启动器。
编辑
使用 Java DSL...
@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {
public static void main(String[] args) {
SpringApplication.run(So44710754Application.class, args);
}
// should store in Redis or similar for persistence
private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(source(), e -> e.poller(Pollers.fixedDelay(30_000)))
.handle(gateway())
.split()
.<File>filter(p -> this.processed.putIfAbsent(p.getAbsolutePath(), true) == null)
.transform(Transformers.fileToByteArray())
.channel(Source.OUTPUT)
.get();
}
private MessageSource<String> source() {
return () -> new GenericMessage<>("foo/*");
}
private AbstractRemoteFileOutboundGateway<LsEntry> gateway() {
AbstractRemoteFileOutboundGateway<LsEntry> gateway = Sftp.outboundGateway(sessionFactory(), "mget", "payload")
.localDirectory(new File("/tmp/foo"))
.options(Option.RECURSIVE)
.get();
gateway.setFileExistsMode(FileExistsMode.IGNORE);
return gateway;
}
private SessionFactory<LsEntry> sessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUser("ftptest");
sf.setPassword("ftptest");
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(sf);
}
}
并使用 Java 配置...
@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {
public static void main(String[] args) {
SpringApplication.run(So44710754Application.class, args);
}
@InboundChannelAdapter(channel = "sftpGate", poller = @Poller(fixedDelay = "30000"))
public String remoteDir() {
return "foo/*";
}
@Bean
@ServiceActivator(inputChannel = "sftpGate")
public SftpOutboundGateway mgetGate() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sessionFactory(), "mget", "payload");
sftpOutboundGateway.setOutputChannelName("splitterChannel");
sftpOutboundGateway.setFileExistsMode(FileExistsMode.IGNORE);
sftpOutboundGateway.setLocalDirectory(new File("/tmp/foo"));
sftpOutboundGateway.setOptions("-R");
return sftpOutboundGateway;
}
@Bean
@Splitter(inputChannel = "splitterChannel")
public DefaultMessageSplitter splitter() {
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannelName("filterChannel");
return splitter;
}
// should store in Redis, Zookeeper, or similar for persistence
private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
@Filter(inputChannel = "filterChannel", outputChannel = "toBytesChannel")
public boolean filter(File payload) {
return this.processed.putIfAbsent(payload.getAbsolutePath(), true) == null;
}
@Bean
@Transformer(inputChannel = "toBytesChannel", outputChannel = Source.OUTPUT)
public FileToByteArrayTransformer toBytes() {
FileToByteArrayTransformer transformer = new FileToByteArrayTransformer();
return transformer;
}
private SessionFactory<LsEntry> sessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUser("ftptest");
sf.setPassword("ftptest");
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(sf);
}
}