如何在 spring 集成中使用 ExecutorChannel?
how to use ExecutorChannel in spring integration?
首先感谢关注
我在我的 spring 集成项目中定义了 ExecutorChannel 和任务执行器,用于使用 spring 批处理对消息进行异步处理,如下所示:
<bean id="ftpSessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="${ftp.server.ip}"/>
<property name="port" value="${ftp.port}"/>
<property name="username" value="${ftp.username}"/>
<property name="password" value="${ftp.password}"/>
<property name="clientMode" value="2"/>
<property name="fileType" value="2"/>
</bean>
<int-ftp:outbound-gateway id="gatewayGET"
local-directory-expression="'./backup/' +#remoteDirectory"
session-factory="ftpSessionFactory"
request-channel="toGetFilesChannel"
reply-channel="toRemoveChannel"
command="get"
command-options="-P"
expression="payload.remoteDirectory + '/' + payload.filename"/>
<int:channel id="toRemoveChannel">
<int:interceptors>
<int:wire-tap channel="logger2"/>
</int:interceptors>
</int:channel>
<int:transformer input-channel="toRemoveChannel" output-channel="outboundJobRequestChannel">
<bean class="ir.ali.configuration.FileMessageToJobRequest">
<property name="fileParameterName" value="fileName"/>
</bean>
</int:transformer>
<int:channel id="outboundJobRequestChannel">
<int:dispatcher task-executor="simpleExecutor"/>
</int:channel>
<task:executor id="simpleExecutor" pool-size="10-1000"
queue-capacity="5000"/>
它工作正常,但抛出以下异常:
org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:286)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:130)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doLs(AbstractRemoteFileOutboundGateway.java:416)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:392)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 27 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:189)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.apache.commons.net.ftp.FTPFileEntryParserImpl.readNextEntry(FTPFileEntryParserImpl.java:53)
at org.apache.commons.net.ftp.FTPListParseEngine.readStream(FTPListParseEngine.java:133)
at org.apache.commons.net.ftp.FTPListParseEngine.readServerList(FTPListParseEngine.java:104)
at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3302)
at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3271)
at org.apache.commons.net.ftp.FTPClient.listFiles(FTPClient.java:2930)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:68)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:41)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:582)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.ls(AbstractRemoteFileOutboundGateway.java:551)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:420)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:416)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
... 32 more
该异常与频道类型无关。
Caused by: java.net.SocketException: Connection reset
这只是意味着您的 ftp 服务器在列出文件时关闭了连接。
首先感谢关注
我在我的 spring 集成项目中定义了 ExecutorChannel 和任务执行器,用于使用 spring 批处理对消息进行异步处理,如下所示:
<bean id="ftpSessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="${ftp.server.ip}"/>
<property name="port" value="${ftp.port}"/>
<property name="username" value="${ftp.username}"/>
<property name="password" value="${ftp.password}"/>
<property name="clientMode" value="2"/>
<property name="fileType" value="2"/>
</bean>
<int-ftp:outbound-gateway id="gatewayGET"
local-directory-expression="'./backup/' +#remoteDirectory"
session-factory="ftpSessionFactory"
request-channel="toGetFilesChannel"
reply-channel="toRemoveChannel"
command="get"
command-options="-P"
expression="payload.remoteDirectory + '/' + payload.filename"/>
<int:channel id="toRemoveChannel">
<int:interceptors>
<int:wire-tap channel="logger2"/>
</int:interceptors>
</int:channel>
<int:transformer input-channel="toRemoveChannel" output-channel="outboundJobRequestChannel">
<bean class="ir.ali.configuration.FileMessageToJobRequest">
<property name="fileParameterName" value="fileName"/>
</bean>
</int:transformer>
<int:channel id="outboundJobRequestChannel">
<int:dispatcher task-executor="simpleExecutor"/>
</int:channel>
<task:executor id="simpleExecutor" pool-size="10-1000"
queue-capacity="5000"/>
它工作正常,但抛出以下异常:
org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:286)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:130)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.net.SocketException: Connection reset
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doLs(AbstractRemoteFileOutboundGateway.java:416)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:392)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 27 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:189)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.apache.commons.net.ftp.FTPFileEntryParserImpl.readNextEntry(FTPFileEntryParserImpl.java:53)
at org.apache.commons.net.ftp.FTPListParseEngine.readStream(FTPListParseEngine.java:133)
at org.apache.commons.net.ftp.FTPListParseEngine.readServerList(FTPListParseEngine.java:104)
at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3302)
at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:3271)
at org.apache.commons.net.ftp.FTPClient.listFiles(FTPClient.java:2930)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:68)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:41)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:582)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.listFilesInRemoteDir(AbstractRemoteFileOutboundGateway.java:598)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.ls(AbstractRemoteFileOutboundGateway.java:551)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:420)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doInSession(AbstractRemoteFileOutboundGateway.java:416)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
... 32 more
该异常与频道类型无关。
Caused by: java.net.SocketException: Connection reset
这只是意味着您的 ftp 服务器在列出文件时关闭了连接。