如何在基于SftpRemoteFileTemplate的SFTP场景中集成一个MessageHandler?

How to integrate a MessageHandler into the SFTP scenario based on SftpRemoteFileTemplate?

我已经基于 Spring 的集成包中的 SftpRemoteFileTemplate 实现了从 SFTP 服务器获取文件、将文件放入和删除文件的服务。

此处 sftpGetPayload 从 SFTP 服务器获取文件并传送其内容。

到目前为止,这是我的代码:

public String sftpGetPayload(String sessionId,
        String host, int port, String user, String password,
        String remoteDir, String remoteFilename, boolean remoteRemove) {
    LOG.info("sftpGetPayload sessionId={}", sessionId);
    LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
    LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
            remoteDir, remoteFilename, remoteRemove);

    final AtomicReference<String> refPayload = new AtomicReference<>();

    SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
            user, password, remoteDir, remoteFilename);

    template.get(remoteDir + "/" + remoteFilename,
            is -> refPayload.set(getAsString(is)));
    LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);

    deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);

    return refPayload.get();
}

private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
        String user, String password, String remoteDir, String remoteFilename) {
    SftpRemoteFileTemplate template =
            new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
    template.setFileNameExpression(
            new LiteralExpression(remoteDir + "/" + remoteFilename));
    template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
    //template.afterPropertiesSet();

    return template;
}

private void deleteRemoteFile(SftpRemoteFileTemplate template,
        String remoteDir, String remoteFilename, boolean remoteRemove) {
    LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
    if (remoteRemove) {
        template.remove(remoteDir + "/" + remoteFilename);
        LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
    }
}

所有这些 GET 操作都是主动操作,这意味着要获取的文件被认为已经存在。我想要一种轮询过程,一旦在 SFTP 服务器上收到文件,它就会调用我的有效负载消耗方法。

我找到了另一个基于Spring beans的实现,配置为Spring Integration Dsl,它声明了一个SftpSessionFactory,一个 SftpInboundFileSynchronizer、一个 SftpMessageSource 和一个 MessageHandler,它轮询 SFTP 站点以接收文件并自动启动消息处理程序进一步处理。

这段代码如下:

    @Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(myHost);
    factory.setPort(myPort);
    factory.setUser(myUser);
    factory.setPassword(myPassword);
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(myRemotePath);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
    return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
            sftpInboundFileSynchronizer());
    source.setLocalDirectory(myLocalDirectory);
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}

如何将这个@Poller/MessageHandler/@ServiceActivator 概念包含到我上面的实现中?或者有没有办法在基于模板的实现中实现这个特性?

场景可能如下:

我有一个 Spring 引导应用程序,其中有几个 类 代表任务。其中一些任务是通过 Spring @Scheduled 注释和 CRON 规范自动调用的,其他任务则不是。

    @Scheduled(cron = "${task.to.start.automatically.frequency}")
    public void runAsTask() {
        ...
    }

第一个任务将从 ist @Sheduled 规范开始,从 SFTP 服务器获取文件并处理它。它将使用自己的通道(host1、port1、user1、password1、remoteDir1、remoteFile1)执行此操作。 第二个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用自己的通道(host2、port2、user2、password2、remoteDir2、remoteFile2)执行此操作。很可能 host2 = host1 和 port2 = port1,但这不是必须的。 第三个任务也将由调度程序 运行 生成一些东西放到 SFTP 服务器上。它将使用与 task1 相同的通道来执行此操作,但此任务是生产者(而不是像 task1 那样的消费者)并将写入除 task1 之外的另一个文件(host1、port1、user1、password1、remoteDir3、remoteFile3)。 任务四没有 @Scheduled 注释,因为它应该意识到何时从第三方接收到它必须处理的文件,因此可以在其通道(host4、port4、user4、password4、remoteDir4、remoteFile4)上获取其内容以进行处理它。

我已经阅读了整个集成内容,但是很难针对这个用例进行转换,无论是从 XML 配置方案到带注释的 Java 配置,还是通过静态 Spring 在 运行 时间将 bean 方法转换为动态方法。

我了解使用 IntegrationFlow 来注册人工制品,task1 的入站适配器,task2 的出站适配器,task3 的入站适配器与 task1 的会话工厂相同(在其他任何地方注册),以及 - 最后但尤其重要的是 - 具有 task4 轮询器功能的入站适配器。 或者它们都应该是具有命令功能的网关吗?或者我应该注册 SftpRemoteFileTemplate 吗?

要定义频道我有:

public class TransferChannel {

    private String host;
    private int port;
    private String user;
    private String password;

    /* getters, setters, hash, equals, and toString */

}

要将所有 SFTP 设置放在一起,我有:

public class TransferContext {

    private boolean enabled;
    private TransferChannel channel;
    private String remoteDir;
    private String remoteFilename;
    private boolean remoteRemove;
    private String remoteFilenameFilter;
    private String localDir;

    /* getters, setters, hash, equals, and toString */

}

作为 SFTP 处理的核心,每个作业都会注入一种 DynamicSftpAdapter:

    @Scheduled(cron = "${task.to.start.automatically.frequency}")
    public void runAsTask() {

        @Autowired
        DynamicSftpAdapter sftp;

        ...

        sftp.connect("Task1", context);
        File f = sftp.getFile("Task1", "remoteDir", "remoteFile");

        /* process file content */

        sftp.removeFile("Task1", "remoteDir", "remoteFile");
        sftp.disconnect("Task1", context);
    }

DynamicSftpAdapter 还只是一个片段:

@Component
public class DynamicSftpAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();

    @Override
    public void connect(String sessionId, TransferContext context) {
        if (this.registrations.containsKey(context.getChannel())) {
            LOG.debug("connect, channel exists for {}", sessionId);         
        }
        else {
            // register the required SFTP Outbound Adapter
            TransferChannel channel = context.getChannel();
            IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
                channel.getHost(), channel.getPort(),
                channel.getUser(), channel.getPassword())));
            this.registrations.put(channel, flowContext.registration(flow).register());
            this.sessions.put(sessionId, context);
            LOG.info("sftp session {} for {} started", sessionId, context);
        }
    }

    private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
        LOG.debug("sftpSessionFactory");
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
        LOG.debug("cashedSftpSessionFactory");
        CachingSessionFactory<LsEntry> cashedSessionFactory =
            new CachingSessionFactory<LsEntry>(
                sftpSessionFactory(host, port, user, password));
        return cashedSessionFactory;
    }

    @Override
    public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
        TransferContext context = sessions.get(sessionId);
        if (context == null)
            throw new IllegalStateException("Session not established, sessionId " + sessionId);

        IntegrationFlowRegistration register = registrations.get(context.getChannel());
        if (register != null) {
            try {
                LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
                register.getMessagingTemplate().send(
                    MessageBuilder.withPayload(msg)
                        .setHeader(...).build());       
            }
            catch (Exception e) {
                appContext.getBean(context, DefaultSftpSessionFactory.class)
                    .close();   
            }
        }
    }

    @Override
    public void disconnect(String sessionId, TransferContext context) {
        IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
        if (registration != null) {
            registration.destroy();
        }
        LOG.info("sftp session for {} finished", context);
    }
}

我不知道如何启动 SFTP 命令。当使用 OutboundGateway 并且必须立即指定 SFTP 命令(如 GET)时,我也没有得到,那么整个 SFTP 处理是否会在一个方法中,指定出站网关工厂并使用 get() 获取实例并可能调用消息 .get() 以任何方式。

显然我需要帮助。

首先,如果您已经在使用 Spring 集成通道适配器,则可能没有理由直接使用像 RemoteFileTemplate 这样的低级 API。

其次存在技术差异:SftpInboundFileSynchronizingMessageSource 将生成本地文件 - 远程文件的完整副本。因此,当我们在下游处理您的 SftpRemoteFileTemplate 逻辑时,它不会很好地工作,因为我们已经带来了一个本地文件 (java.io.File),而不是远程文件表示的实体。

即使您在 sftpGetPayload() 中的逻辑看起来不像需要这样一个单独的方法那么复杂和自定义,最好将 SftpRemoteFileTemplate 作为单例并共享它当您使用同一个 SFTP 服务器时,不同组件之间。它只是无状态的直接 Spring 模板模式实现。

如果您仍然坚持使用上述集成流程中的方法,您应该考虑为该 @ServiceActivator(inputChannel = "sftpChannel") 调用一个 POJO 方法。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations.

您还可能会发现 SFTP 出站网关是对您的用例有用的组件。它有一些常见的场景实现:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway