如何处理 spring 集成流程中的 FtpOutboundAdapter 连接异常

How to handle FtpOutboundAdapter connection exception in spring integration flow

我的流程以这样的方式结束:

.handle(ftpAdapters.rtwFtpOutboundHandler(), out -> out.advice(after()))
.get();

在建议中,我们有代码要处理:

@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
    logger.debug("Evaluating expression advice. ");
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannel(rtwSourceDeletionChannel());
    advice.setFailureChannel(rtwFtpFailureHandleChannel());
    advice.setPropagateEvaluationFailures(true);
    return advice;
}

但是上面的代码不足以处理,ftp 连接 bean 引发了异常。 (我使用的是默认会话工厂。

如何处理流程中的这些异常?

异常跟踪:

2019-02-06 16:10:12.312 ERROR 1088 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [rtwFtpOutboundHandler]; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item, failedMessage=GenericMessage [payload=5177_Theivendran_29031987_3101201910352911.pdf, headers={file_name=5177_Theivendran_29031987_3101201910352911.pdf, id=0343693b-038a-a685-720d-1fff86569154, timestamp=1549469391281}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.execute(AbstractRequestHandlerAdvice.java:75)
    at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:211)
    at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy106.handleMessage(Unknown Source)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=13=](ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:298)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:286)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:278)
    at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:204)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    ... 103 more
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:182)
    at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:437)
    ... 108 more
Caused by: java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170)
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41)
    at org.springframework.integration.file.remote.session.CachingSessionFactory.createForPool(CachingSessionFactory.java:84)
    at org.springframework.integration.file.remote.session.CachingSessionFactory.createForPool(CachingSessionFactory.java:81)
    at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:192)
    at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:173)
    ... 110 more
Caused by: java.net.ConnectException: Connection timed out: connect
    at java.base/java.net.PlainSocketImpl.connect0(Native Method)
    at java.base/java.net.PlainSocketImpl.socketConnect(PlainSocketImpl.java:101)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
    at java.base/java.net.Socket.connect(Socket.java:591)
    at org.apache.commons.net.SocketClient._connect(SocketClient.java:243)
    at org.apache.commons.net.SocketClient.connect(SocketClient.java:202)
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:193)
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:167)
    ... 115 more

根据您的堆栈跟踪,ExpressionEvaluatingRequestHandlerAdvice 捕获了 Connection timed out: connect 的异常并按原样重新抛出它。因为你没有配置 onFailureExpression:

catch (Exception e) {
        Exception actualException = this.unwrapExceptionIfNecessary(e);
        if (this.onFailureExpression != null) {
            Object evalResult = this.evaluateFailureExpression(message, actualException);
            if (this.returnFailureExpressionResult) {
                return evalResult;
            }
        }
        if (!this.trapException) {
            throw actualException;
        }
        return null;
    }

行为是

advice.setSuccessChannel(rtwSourceDeletionChannel());
advice.setFailureChannel(rtwFtpFailureHandleChannel());

没有配置适当的表达式时没有意义。

因此,如果您想使用该建议处理 FTP 异常,则需要在那里配置一个 onFailureExpressionsetOnFailureExpressionString("#exception") 之类的东西应该就足够了。这样,捕获的 FTP 异常将作为有效负载发送到配置的 rtwFtpFailureHandleChannel.

我认为现在是改进 ExpressionEvaluatingRequestHandlerAdvice 以在配置通道时要求表达的时候了。否则它们没有意义。

修复在这里:https://github.com/spring-projects/spring-integration/pull/2738