Spring Cloud App Starter,sftp 源,递归文件目录

Spring Cloud App Starter, sftp source, recurse a directory for files

我在 Spring 云数据流中使用 SFTP 源,它正在努力获取在 sftp:remote-dir:/home/someone/source 中定义的文件,现在我在远程目录下有很多子文件夹-dir 并且我想递归地获取此目录下与 patten 匹配的所有文件。我正在尝试使用 filename-regex: 但到目前为止它只适用于一个级别。我如何递归地获取我需要的文件。

入站通道适配器不支持递归;使用带有递归 (-R) 的 outbound gateway 和 MGET 命令的自定义源。

文档缺少该选项;固定在 current docs.

我打开了 an issue 来创建一个标准的应用启动器。

编辑

使用 Java DSL...

@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {

    public static void main(String[] args) {
        SpringApplication.run(So44710754Application.class, args);
    }

    // should store in Redis or similar for persistence
    private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(source(), e -> e.poller(Pollers.fixedDelay(30_000)))
                .handle(gateway())
                        .split()
                        .<File>filter(p -> this.processed.putIfAbsent(p.getAbsolutePath(), true) == null)
                        .transform(Transformers.fileToByteArray())
                        .channel(Source.OUTPUT)
                        .get();
    }

    private MessageSource<String> source() {
        return () -> new GenericMessage<>("foo/*");
    }

    private AbstractRemoteFileOutboundGateway<LsEntry> gateway() {
        AbstractRemoteFileOutboundGateway<LsEntry> gateway = Sftp.outboundGateway(sessionFactory(), "mget", "payload")
                .localDirectory(new File("/tmp/foo"))
                .options(Option.RECURSIVE)
                .get();
        gateway.setFileExistsMode(FileExistsMode.IGNORE);
        return gateway;
    }

    private SessionFactory<LsEntry> sessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.3");
        sf.setUser("ftptest");
        sf.setPassword("ftptest");
        sf.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(sf);
    }

}

并使用 Java 配置...

@SpringBootApplication
@EnableBinding(Source.class)
public class So44710754Application {

    public static void main(String[] args) {
        SpringApplication.run(So44710754Application.class, args);
    }

    @InboundChannelAdapter(channel = "sftpGate", poller = @Poller(fixedDelay = "30000"))
    public String remoteDir() {
        return "foo/*";
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpGate")
    public SftpOutboundGateway mgetGate() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sessionFactory(), "mget", "payload");
        sftpOutboundGateway.setOutputChannelName("splitterChannel");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.IGNORE);
        sftpOutboundGateway.setLocalDirectory(new File("/tmp/foo"));
        sftpOutboundGateway.setOptions("-R");
        return sftpOutboundGateway;
    }

    @Bean
    @Splitter(inputChannel = "splitterChannel")
    public DefaultMessageSplitter splitter() {
        DefaultMessageSplitter splitter = new DefaultMessageSplitter();
        splitter.setOutputChannelName("filterChannel");
        return splitter;
    }

    // should store in Redis, Zookeeper, or similar for persistence
    private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();

    @Filter(inputChannel = "filterChannel", outputChannel = "toBytesChannel")
    public boolean filter(File payload) {
        return this.processed.putIfAbsent(payload.getAbsolutePath(), true) == null;
    }

    @Bean
    @Transformer(inputChannel = "toBytesChannel", outputChannel = Source.OUTPUT)
    public FileToByteArrayTransformer toBytes() {
        FileToByteArrayTransformer transformer = new FileToByteArrayTransformer();
        return transformer;
    }

    private SessionFactory<LsEntry> sessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.3");
        sf.setUser("ftptest");
        sf.setPassword("ftptest");
        sf.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(sf);
    }

}