Spring SFTP 出站适配器 - 确定何时发送文件

Spring SFTP Outbound Adapter - determining when files have been sent

我有一个 Spring SFTP 输出适配器,我在主程序中通过“adapter.start()”启动它。启动后,适配器会按预期传输和上传指定目录中的所有文件。但是我想在传输完所有文件后停止适配器。我如何检测是否所有文件都已传输以便我可以发出 adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

But I want to stop the adapter after all the files have been transferred.

从逻辑上讲,这不是为这种组件设计的。由于您不会有一些不断变化的本地目录,因此最好考虑一个偶数驱动程序解决方案来通过某些操作列出目录中的文件。是的,它可以是来自 main 的调用,但是对于 dir 的所有内容只调用一次,仅此而已。

因此,带有 Command.MPUTSftp.outboundGateway() 适合您:

https://docs.spring.io/spring-integration/reference/html/sftp.html#using-the-mput-command.

您仍然可以触发 IntegrationFlow,但它可以从 @MessagingGateway 接口开始,从 main 调用本地目录以列出要上传的文件:

https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-gateway

@Artem Bilan 已经给出了答案。但这是他所说内容的具体实现 - 对于像我这样 Spring 集成菜鸟的人:

  1. 定义一个服务来按需获取 PDF 文件:
@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}
  1. 定义网关以按需启动 SFTP 上传流程:
@MessagingGateway
public interface SFtpOutboundGateway {
    @Gateway(requestChannel = "sftpOutboundFlow.input")
    void uploadFiles(List<File> files);
}
  1. 定义 集成流程 以通过 Sftp.outboundGateway:
  2. 将文件上传到 SFTP 服务器
@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
    // could be also bound via @Value 
    private String sftpRemoteDirectory = "/path/to/remote/dir";

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22222);
        factory.setUser("client1");
        factory.setPassword("password123");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
        return e -> e
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                .handle(
                    Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                );
    }

    @Bean
    public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
        RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
        template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        template.setAutoCreateDirectory(true);
        template.afterPropertiesSet();
        template.setUseTemporaryFileName(false);
        return template;
    }
}

正在接线:

public class SpringApp {
    public static void main(String[] args) {
        final MyFileService fileService = ctx.getBean(MyFileService.class);
        final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
        // trigger the sftp upload flow manually - only once
        sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
    }
}

导入笔记:

1.

@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);

此处 DirectChannel 通道 sftpOutboundFlow.input 将用于将带有有效负载 (= List<File> files) 的消息传递给接收方。如果尚未创建此通道,网关将隐式创建它。

2.

@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }

由于 IntegrationFlow 是一个 Consumer 功能接口,我们可以使用 IntegrationFlowDefinition 稍微简化流程。在 bean 注册阶段,IntegrationFlowBeanPostProcessor 将此内联 (Lambda) IntegrationFlow 转换为 StandardIntegrationFlow 并处理其组件。使用 Lambda 的 IntegrationFlow 定义将 DirectChannel 填充为流的 inputChannel,并在应用程序上下文中注册为上面示例中名称为 sftpOutboundFlow.input 的 bean(流 bean 名称 +“.input”)。这就是我们为 SFtpOutboundGateway 网关使用该名称的原因。

参考:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

3.

@Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}

参见:

流程图: