Spring 带删除的 SFTP 出站集成 DSL
Spring Integration DSL for SFTP Outbound with delete
我正在使用
- Sprint 集成(文件、SFTP 等)4.3.6
- Spring 引导 1.4.3
- Spring 集成 Java DSL 1.1.4
并且我正在尝试设置一个 SFTP 出站适配器,它允许我将文件移动到远程系统上的目录,并在我的本地系统中删除或重命名该文件。
因此,例如我想将文件 a.txt 放置在本地目录中,并将其通过 SFTP 传输到目录中的远程服务器入站。传输完成后,我想删除或重命名 a.txt 的本地副本。
我正在尝试几种方法来解决这个问题。所以这里是我测试用的常用SessionFactory
protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
sessionFactory.setHost("localhost");
sessionFactory.setUser("user");
sessionFactory.setAllowUnknownKeys(true);
sessionFactory.setPassword("pass");
CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
return cachingSessionFactory;
}
这是一个转换器,我必须将一些 headers 添加到消息中
@Override
public Message<File> transform(Message<File> source) {
System.out.println("here is the thing : "+source);
File file = (File)source.getPayload();
Message<File> transformedMessage = MessageBuilder.withPayload(file)
.copyHeaders(source.getHeaders())
.setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
.setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
.build();
return transformedMessage;
}
然后我有一个集成流程,它使用轮询器来监视本地目录并调用它:
@Bean
public IntegrationFlow pushTheFile(){
return IntegrationFlows
.from(s -> s.file(new File(DIR_TO_WATCH))
.patternFilter("*.txt").preventDuplicates(),
e -> e.poller(Pollers.fixedDelay(100)))
.transform(outboundTransformer)
.handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
.remoteFileSeparator("/")
.useTemporaryFileName(false)
.remoteDirectory("inbound/")
)
.get();
}
这工作正常,但保留了本地文件。关于上传完成后如何删除本地文件的任何想法?我应该看看 SftpOutboundGateway
吗?
提前致谢!
Artem 的回答非常有效!这是一个在推送后删除本地文件的快速示例。
@Bean
public IntegrationFlow pushTheFile(){
return IntegrationFlows
.from(s -> s.file(new File(DIR_TO_WATCH))
.patternFilter("*.txt").preventDuplicates(),
e -> e.poller(Pollers.fixedDelay(100)))
.transform(outboundTransformer)
.handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
.remoteFileSeparator("/")
.useTemporaryFileName(false)
.remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
)
.get();
}
@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression("payload.delete()");
advice.setOnFailureExpression("payload + ' failed to upload'");
advice.setTrapException(true);
return advice;
}
为此,您可以使用多种方法。
所有这些都是基于您对 Sftp.outboundAdapter()
的原始请求消息做了其他事情这一事实。
.publishSubscribeChannel()
允许您向多个订阅者发送相同的消息,当第二个订阅者收到它时,只有第一个订阅者完成它的工作。默认情况下,如果您不指定 Executor
routeToRecipients()
允许您通过不同的组件获得相同的结果。
ExpressionEvaluatingRequestHandlerAdvice
- 你将这个添加到 Sftp.outboundAdapter()
端点定义的 .advice()
- 第二个 .handle()
参数并执行 file.delete()
通过 onSuccessExpression
:
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
我正在使用
- Sprint 集成(文件、SFTP 等)4.3.6
- Spring 引导 1.4.3
- Spring 集成 Java DSL 1.1.4
并且我正在尝试设置一个 SFTP 出站适配器,它允许我将文件移动到远程系统上的目录,并在我的本地系统中删除或重命名该文件。
因此,例如我想将文件 a.txt 放置在本地目录中,并将其通过 SFTP 传输到目录中的远程服务器入站。传输完成后,我想删除或重命名 a.txt 的本地副本。
我正在尝试几种方法来解决这个问题。所以这里是我测试用的常用SessionFactory
protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
sessionFactory.setHost("localhost");
sessionFactory.setUser("user");
sessionFactory.setAllowUnknownKeys(true);
sessionFactory.setPassword("pass");
CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
return cachingSessionFactory;
}
这是一个转换器,我必须将一些 headers 添加到消息中
@Override
public Message<File> transform(Message<File> source) {
System.out.println("here is the thing : "+source);
File file = (File)source.getPayload();
Message<File> transformedMessage = MessageBuilder.withPayload(file)
.copyHeaders(source.getHeaders())
.setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
.setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
.build();
return transformedMessage;
}
然后我有一个集成流程,它使用轮询器来监视本地目录并调用它:
@Bean
public IntegrationFlow pushTheFile(){
return IntegrationFlows
.from(s -> s.file(new File(DIR_TO_WATCH))
.patternFilter("*.txt").preventDuplicates(),
e -> e.poller(Pollers.fixedDelay(100)))
.transform(outboundTransformer)
.handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
.remoteFileSeparator("/")
.useTemporaryFileName(false)
.remoteDirectory("inbound/")
)
.get();
}
这工作正常,但保留了本地文件。关于上传完成后如何删除本地文件的任何想法?我应该看看 SftpOutboundGateway
吗?
提前致谢!
Artem 的回答非常有效!这是一个在推送后删除本地文件的快速示例。
@Bean
public IntegrationFlow pushTheFile(){
return IntegrationFlows
.from(s -> s.file(new File(DIR_TO_WATCH))
.patternFilter("*.txt").preventDuplicates(),
e -> e.poller(Pollers.fixedDelay(100)))
.transform(outboundTransformer)
.handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
.remoteFileSeparator("/")
.useTemporaryFileName(false)
.remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
)
.get();
}
@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression("payload.delete()");
advice.setOnFailureExpression("payload + ' failed to upload'");
advice.setTrapException(true);
return advice;
}
为此,您可以使用多种方法。
所有这些都是基于您对 Sftp.outboundAdapter()
的原始请求消息做了其他事情这一事实。
.publishSubscribeChannel()
允许您向多个订阅者发送相同的消息,当第二个订阅者收到它时,只有第一个订阅者完成它的工作。默认情况下,如果您不指定Executor
routeToRecipients()
允许您通过不同的组件获得相同的结果。ExpressionEvaluatingRequestHandlerAdvice
- 你将这个添加到Sftp.outboundAdapter()
端点定义的.advice()
- 第二个.handle()
参数并执行file.delete()
通过onSuccessExpression
:.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))