使用 Java DSL 在远程 S(ftp) 中移动已处理的文件

Moving processed files in remote S(ftp) using Java DSL

一旦批处理使用 Spring 集成和 Java DSL 成功处理了文件,我将尝试移动远程 SFTP 上的文件。

实现该目标的最佳方法是什么?

  1. 批量添加一个步骤来移动远程文件?
  2. 或使用FTP出站网关并提供MV命令?

我更喜欢第二种解决方案,让批处理只关注逻辑,但我很难尝试用 java dsl 来实现它。

我已阅读 http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway 并尝试像那样实施 :

@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp.outboundGateway(SftpSessionFactory(), 
            AbstractRemoteFileOutboundGateway.Command.MV, "payload")
            .localDirectory(new File("/home/blabla/"))
            .get();

}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\.xml.mini$")
                ...             
               , 
                e -> e.id("sftpInboundAdapter")
                .poller(
                        Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        .advice(retryAdvice())
                        )
            )
            .enrichHeaders( h -> h
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
                    .header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
                    .header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw()))
            .transform(jobExecutionToFileStringTransformer())
            .handle(ftpOutboundGateway())
            .handle(logger())
            .get();
}

我知道我的 headers 应该是动态的,但我不知道该怎么做,所以现在我使用现有文件的名称。我收到此错误消息(他正试图删除目标目录中的文件!):

Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file     at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
    at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 94 more
 Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
    at org.springframework.integration.file.remote.RemoteFileTemplate.doInSessionWithoutResult(RemoteFileTemplate.java:300)
    at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
    ... 100 more
 Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
    ... 103 more

感谢您的帮助!

编辑 工作流程,然后我简化了很多,但这里是我上一个问题的解决方案:

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(
                Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\.xml$")
                ...
                , 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
                        .maxMessagesPerPoll(-1)
                        )
            )
            .enrichHeaders( h -> h
                    // headers necessary for moving remote files (ftpOutboundGateway)
                    .headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
                    .headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
                    .header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
                    // headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
                    .headerExpression(FileHeaders.ORIGINAL_FILE,  "payload.getAbsolutePath()" )
                    .headerExpression(FileHeaders.FILENAME,  "payload.getName()")
            )
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))

            .<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
                                            mapping -> mapping
                                            .subFlowMapping("true", sf -> sf


                                                .handle(org.springframework.batch.core.JobExecution.class,
                                                         (p, h) -> myServiceActivator.jobExecutionToString(p, 
                                                                 (String) h.get(FileHeaders.REMOTE_DIRECTORY),
                                                                 (String) h.get(FileHeaders.REMOTE_FILE)))
                                                .handle(ftpOutboundGateway())
                                                .handle(Boolean.class,
                                                         (p, h) -> myServiceActivator.BooleanToString(p, 
                                                                 (String) h.get(FileHeaders.FILENAME)))
                                                .handle(fileOutboundGateway_MoveToProcessedDirectory())

                                                                                    )


                                        .subFlowMapping("false", sf -> sf
                                            .channel("nullChannel")     

                                            )
            )

            .handle(logger())
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedRate(500).get();
}


@Bean
public MessageHandler ftpOutboundGateway() {
    return Sftp
            .outboundGateway(SftpSessionFactory(),
                    AbstractRemoteFileOutboundGateway.Command.MV,
                    "payload")
            .renameExpression("headers['file_renameTo']").get();
}

可能您没有重命名权限,或者由于其他原因重命名失败;此异常是尝试删除 "to" 文件名,因为初始重命名失败。打开 DEBUG 日志记录,您应该会看到此日志...

if (logger.isDebugEnabled()){
    logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: "
            + pathTo + " and execute rename again.");
}
try {
    this.remove(pathTo);

由于此 remove() 操作失败,您的失败表明由于某些其他原因重命名失败(因为显然 "to" 文件不存在)。

看一下我把ftpInbound里面的文件删了,处理后重新上传到新的目录里