Spring 集成 Java DSL SFTP 异常处理

Spring Integration Java DSL SFTP Exception Handling

我正在使用 spring 文件传输集成

以下代码工作正常,但如果我从源目录中删除文件夹,它就会开始抛出异常。

IntegrationFlow flow = IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(new File(classLoader.getResource(".").getFile() + "/files/" + String.valueOf(config.getId())))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .remoteDirectory(config.getInboundDirectory()), e -> e.poller(Pollers.fixedDelay(4_000)))
            .transform((File f) -> pgpEncryption.encrypt(f))
            .handle(Sftp.outboundAdapter(outboundSftp)
                    .useTemporaryFileName(false)
                    .autoCreateDirectory(true)
                    .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(sftpConfig.deleteFileAdvice())
            )
            .get();

如果源文件夹不存在,则以下为例外情况。

2018-10-01 17:13:02.362 ERROR 12320 --- [ask-scheduler-8] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to list files; nested exception is 2: No such file
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:331)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:260)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:65)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:43)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:154)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:243)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:262)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run[=13=](AbstractPollingEndpoint.java:391)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=13=](ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:385)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to list files; nested exception is 2: No such file
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:286)
    ... 20 more
Caused by: org.springframework.core.NestedIOException: Failed to list files; nested exception is 2: No such file
    at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:103)
    at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:50)
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.list(CachingSessionFactory.java:218)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.lambda$synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:287)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:436)
    ... 21 more
Caused by: 2: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2225)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2242)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1592)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1553)
    at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:91)
    ... 25 more

那么,捕获此异常并记录相应消息的方法是什么。 我还会遇到一些其他问题,例如无效的服务器 ip,无效的私钥。

默认情况下,消息执行期间的所有异常都会发送到 'errorChannel'。
您可以在此通道上定义一个服务激活器并提供您的异常处理逻辑。

</p> <pre><code>@Bean @ServiceActivator(inputChannel = "errorChannel") public MyExceptionHandler exceptionHandler() { return new MyExceptionHandler (); }


看 - Spring Integration DSL ErrorHandling
https://docs.spring.io/spring-integration/docs/latest-ga/reference/html/configuration.html#namespace-errorhandler

o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: 确实来自默认配置为 SourcePollingChannelAdapter 的全局 errorChannel。您可以通过 SourcePollingChannelAdapterSpec 及其 poller() 配置提供您自己的变体:

e -> e.poller(Pollers.fixedDelay(4_000).errorChannel(myErrorChannel))

可以像订阅任何其他频道一样订阅此自定义频道,您可以在那里以任何需要的方式处理 ErrorChannel@ServiceActivator或者其他IntegrationFlow都可以用在这件事上。