如何在 spring 集成中使用 ExecutorChannel?

how to use ExecutorChannel in spring integration?

首先感谢关注
我在我的 spring 集成项目中定义了 ExecutorChannel 和任务执行器,用于使用 spring 批处理对消息进行异步处理,如下所示:

<bean id="ftpSessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
              <property name="host" value="${ftp.server.ip}"/>
              <property name="port" value="${ftp.port}"/>
              <property name="username" value="${ftp.username}"/>
              <property name="password" value="${ftp.password}"/>
              <property name="clientMode" value="2"/>
              <property name="fileType" value="2"/>
       </bean>
   <int-ftp:outbound-gateway id="gatewayGET"
                              local-directory-expression="'./backup/' +#remoteDirectory"
                              session-factory="ftpSessionFactory"
                              request-channel="toGetFilesChannel"
                              reply-channel="toRemoveChannel"
                              command="get"
                              command-options="-P"
                              expression="payload.remoteDirectory + '/' + payload.filename"/>
    <int:channel id="toRemoveChannel">
        <int:interceptors>
            <int:wire-tap channel="logger2"/>
        </int:interceptors>
    </int:channel>
 <int:transformer input-channel="toRemoveChannel" output-channel="outboundJobRequestChannel">
        <bean class="ir.ali.configuration.FileMessageToJobRequest">
            <property name="fileParameterName" value="fileName"/>
        </bean>
    </int:transformer>
 <int:channel id="outboundJobRequestChannel">
        <int:dispatcher task-executor="simpleExecutor"/>
</int:channel>   
<task:executor id="simpleExecutor" pool-size="10-1000"
                   queue-capacity="5000"/>

它工作正常,但抛出以下异常:

org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:286)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:130)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doLs(AbstractRemoteFileOutboundGateway.java:416)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:392)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 27 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:189)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at org.apache.commons.net.ftp.FTPFileEntryParserImpl.readNextEntry(FTPFileEntryParserImpl.java:53)
    at org.apache.commons.net.ftp.FTPListParseEngine.readStream(FTPListParseEngine.java:133)
    at org.apache.commons.net.ftp.FTPListParseEngine.readServerList(FTPListParseEngine.java:104)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3302)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3271)
    at org.apache.commons.net.ftp.FTPClient.listFiles(FTPClient.java:2930)
    at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:68)
    at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:41)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:582)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.ls(AbstractRemoteFileOutboundGateway.java:551)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:420)
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:416)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
    ... 32 more

该异常与频道类型无关。

Caused by: java.net.SocketException: Connection reset

这只是意味着您的 ftp 服务器在列出文件时关闭了连接。