Spring 带删除的 SFTP 出站集成 DSL

Spring Integration DSL for SFTP Outbound with delete

我正在使用

并且我正在尝试设置一个 SFTP 出站适配器,它允许我将文件移动到远程系统上的目录,并在我的本地系统中删除或重命名该文件。

因此,例如我想将文件 a.txt 放置在本地目录中,并将其通过 SFTP 传输到目录中的远程服务器入站。传输完成后,我想删除或重命名 a.txt 的本地副本。

我正在尝试几种方法来解决这个问题。所以这里是我测试用的常用SessionFactory

protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
    DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
    sessionFactory.setHost("localhost");
    sessionFactory.setUser("user");
    sessionFactory.setAllowUnknownKeys(true);
    sessionFactory.setPassword("pass");
    CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
    return cachingSessionFactory;
}

这是一个转换器,我必须将一些 headers 添加到消息中

@Override
public Message<File> transform(Message<File> source) {
    System.out.println("here is the thing : "+source);
    File file = (File)source.getPayload();
    Message<File> transformedMessage = MessageBuilder.withPayload(file)
            .copyHeaders(source.getHeaders())
            .setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
            .setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
            .build();
    return transformedMessage;
}

然后我有一个集成流程,它使用轮询器来监视本地目录并调用它:

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/")
            )
            .get();
}

这工作正常,但保留了本地文件。关于上传完成后如何删除本地文件的任何想法?我应该看看 SftpOutboundGateway 吗?

提前致谢!

Artem 的回答非常有效!这是一个在推送后删除本地文件的快速示例。

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
            )
            .get();
}

@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpression("payload.delete()");
    advice.setOnFailureExpression("payload + ' failed to upload'");
    advice.setTrapException(true);
    return advice;
}

为此,您可以使用多种方法。

所有这些都是基于您对 Sftp.outboundAdapter() 的原始请求消息做了其他事情这一事实。

  1. .publishSubscribeChannel() 允许您向多个订阅者发送相同的消息,当第二个订阅者收到它时,只有第一个订阅者完成它的工作。默认情况下,如果您不指定 Executor

  2. routeToRecipients() 允许您通过不同的组件获得相同的结果。

  3. ExpressionEvaluatingRequestHandlerAdvice - 你将这个添加到 Sftp.outboundAdapter() 端点定义的 .advice() - 第二个 .handle() 参数并执行 file.delete() 通过 onSuccessExpression:

    .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))