为什么 SFTP 出站网关在我启动其集成流程后没有立即开始工作?
Why does the SFTP Outbound Gateway not start working as soon as I start its Integration Flow?
我的场景是我有一个 Spring 集成流和一个 SftpInboundAdapter,它将从 SftpServer“myHost”获取一个文件。该文件包含 JSON,它将被转换为 MyEvent 实体以转发给进一步处理。该过程在 Spring 调度程序调度的任务中实现。因此,集成流程不会随着具有 autoStartup(false)
.
的应用程序自动启动
Spring 集成流程是:
- testSftpSessionFactory:向 SFTP 服务器提供会话
- testSftpInboundAdapter: 获取SFTP远程文件
- sftpInputChannel:具有多个消息消费者的发布-订阅通道
- sftpInputChannel-MessageHandler: 获取JSON 转换文件的内容
- deleteLocalFileService-MessageHandler: 处理成功后删除远程文件
- controlChannel:发送集成流控制命令
- controlChannel-ExpressionControlBusFactoryBean: 启动testSftpInboundAdapter
类型 TransferContext
、TransferChannel
和 MyService
是我的 Java 类,其中一些字段来自 YAML 属性,这些属性为 sftpSessionFactory
作为主机、端口、用户、密码,sftpInboundAdapter
作为 remoteDirectory、remoteFilename、preserveTimestamp、localDirectory 等。该服务将处理 MyEvent
个实体。
这些是我的 SI 豆:
@Configuration
public class MySftpConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MySftpConfiguration.class);
@Bean
public SessionFactory<LsEntry> testSftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("testSftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(transferChannel.getHost());
sf.setPort(transferChannel.getPort());
sf.setUser(transferChannel.getUser());
sf.setPassword(transferChannel.getPassword());
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow testSftpInboundFlow(TransferContext context) {
LOG.debug("testSftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(testSftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir())),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(5000))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
myService().myServiceMethod((myEvent[]) message.getPayload());
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
/* ... */
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
以下代码片段是执行所有操作的任务,首先启动 SI 集成流程以从 SFTP 服务器获取文件并处理其内容:
@Scheduled(cron = "${harry.potter.tasks.my-task.frequency}")
public void runAsTask() {
if (env.isEnabled() && myTaskEnv.isEnabled()) {
LOG.info("runAsTask");
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask env={}, myTask={}", env.isEnabled(), myTaskEnv.isEnabled());
LOG.debug("runAsTask {} not enabled: aborted.", jobId);
return;
}
try {
LOG.debug("runAsTask globally enabled={}", env.isEnabled());
/* ... */
List<MyEvent> myEvents = null;
try {
myEvents = getFile();
}
catch (Exception e) {
LOG.error("{} failed! Exception {}!!!", jobId, e.getClass().getName());
LOG.debug("... but continuing ...");
}
/* ... createEvents ... */
/* ... cleanData ... */
/* ... storeEvents ... */
/* ... cleanHistoricData ... */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
}
catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
}
finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
从 SFTP 获取文件最重要的部分是以下 getFile()
方法。它向 sftpInboundAdapter
发送 start
命令,然后启动计数为 1 的 CountDownLatch
以等待(最多 20 秒)获取远程文件。如果文件是在此超时期限内获取的,则其实体是 returned,否则 return (null):
没有任何内容
编辑 1:
在开始 SftpInboundAdapter
之前添加 ChannelInterceptor
:
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
public List<MyEvent> getFile() {
LOG.debug("getFile");
final List<MyEvent> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}, message={}",
channel.toString(), sent, message.getPayload());
// read and transform the content
Arrays.stream((MyEvent[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout=30s");
if (latch.await(20, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
throw new IllegalStateException("expected file not available");
}
return null;
}
当它运行时,我的预期是 SI bean 首先在主线程中建立。然后另一个线程被Spring调度器启动来执行任务。当通过控制通道发送开始消息时,另一个线程将开始与 SFTP 服务器协商并传输文件,同时任务线程进入等待倒计时锁存器达到零的状态。
日志记录显示了另一种情况。通过控制通道发送启动命令对执行某项操作的线程数没有影响。所以任务线程进入等待latch的时候没有activity,但是没有人倒计时。因此,整整 20 秒过去了,getFile()
方法 return 没有数据。
任务线程一完成,JSCH 模块就开始与 SFTP 服务器协商。
文件的传输还不能工作,但目前应该没有兴趣。
这是日志记录:
2021-09-16 13:54:34.877 INFO [] --- [http-nio-8080-exec-55] com.harry.potter.Application : The following profiles are active: test
2021-09-16 13:54:37.129 INFO [] --- [http-nio-8080-exec-55] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-09-16 13:54:37.418 INFO [] --- [http-nio-8080-exec-55] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.handler.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:mySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:54:37.614 INFO [] --- [http-nio-8080-exec-55] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-09-16 13:54:38.458 INFO [] --- [http-nio-8080-exec-55] c.l.job.MyApplication : Started MyApplication in 4.539 seconds (JVM running for 88524.557)
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask:
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : ---------------------------------------------------------------
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : Job started:
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask globally enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsKeepBackwards=12
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelStart=2
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelEnd=null
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile
2021-09-16 13:55:00.023 INFO [] --- [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:00.024 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile 'Start' control message sent successfully=true
2021-09-16 13:55:00.025 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile, waiting for the file to be received, timeout=20s
2021-09-16 13:55:20.026 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception java.lang.IllegalStateException!!!
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : ... but continuing ...
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData
2021-09-16 13:55:20.040 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData refDateFrom=2021-11-01, refDateTo=null
2021-09-16 13:55:20.042 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception: {}
java.lang.NullPointerException: null
at com.harry.potter.job.MyEvents Task.cleanData(MyEvents Task.java:187)
at com.harry.potter.job.MyEvents Task.runAsTask(MyEvents Task.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:836)
2021-09-16 13:55:20.042 INFO [] --- [scheduling-1] job-MyEvents : Job has errors, send error mail
2021-09-16 13:55:20.042 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : sendErrorMail
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents finished
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] job-MyEvents : Job finished
2021-09-16 13:55:20.205 INFO [] --- [scheduling-1] com.jcraft.jsch : Connecting to myHost port 22
2021-09-16 13:55:20.207 INFO [] --- [scheduling-1] com.jcraft.jsch : Connection established
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Remote version string: SSH-2.0-OpenSSH_6.6.1p1 Debian-4~bpo70+1
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Local version string: SSH-2.0-JSCH-0.1.54
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-09-16 13:55:20.220 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2021-09-16 13:55:20.257 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT sent
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT received
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: ssh-rsa,ssh-ed25519
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server->client aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client->server aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXDH_INIT sent
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : expecting SSH_MSG_KEXDH_REPLY
2021-09-16 13:55:20.296 INFO [] --- [scheduling-1] com.jcraft.jsch : ssh_rsa_verify: signature true
2021-09-16 13:55:20.298 INFO [] --- [scheduling-1] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'myHost ' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-09-16 13:55:20.298 WARN [] --- [scheduling-1] com.jcraft.jsch : Permanently added 'myHost ' (RSA) to the list of known hosts.
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS sent
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS received
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_REQUEST sent
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_ACCEPT received
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: publickey,keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: publickey
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: keyboard-interactive
2021-09-16 13:55:20.409 INFO [] --- [scheduling-1] com.jcraft.jsch : Disconnecting from myHost port 22
2021-09-16 13:55:20.419 ERROR [] --- [scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'data' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: java.lang.IllegalStateException: failed to create SFTP Session
...
Caused by: java.lang.IllegalStateException: failed to connect
...
Caused by: com.jcraft.jsch.JSchException: Auth cancel
...
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.handler.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:MySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:55:28.857 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2021-09-16 13:55:28.858 INFO [] --- [Catalina-utility-2] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
我做错了什么?
既然你说这两个任务都在同一个线程上启动,那么看起来你处理的是最新的 Spring 引导:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.spring-integration。当 Spring 集成现在依赖于自动配置的 TaskScheduler
,它在其池中有一个线程。
您可以更改该配置,或者您可以将 task-executor
添加到 sftpInboundAdapter
入站通道适配器定义的 poller
:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#taskexecutor-support。这样,一项真正的工作将从调度程序线程转移到该执行程序提供的线程。
我的场景是我有一个 Spring 集成流和一个 SftpInboundAdapter,它将从 SftpServer“myHost”获取一个文件。该文件包含 JSON,它将被转换为 MyEvent 实体以转发给进一步处理。该过程在 Spring 调度程序调度的任务中实现。因此,集成流程不会随着具有 autoStartup(false)
.
Spring 集成流程是:
- testSftpSessionFactory:向 SFTP 服务器提供会话
- testSftpInboundAdapter: 获取SFTP远程文件
- sftpInputChannel:具有多个消息消费者的发布-订阅通道
- sftpInputChannel-MessageHandler: 获取JSON 转换文件的内容
- deleteLocalFileService-MessageHandler: 处理成功后删除远程文件
- controlChannel:发送集成流控制命令
- controlChannel-ExpressionControlBusFactoryBean: 启动testSftpInboundAdapter
类型 TransferContext
、TransferChannel
和 MyService
是我的 Java 类,其中一些字段来自 YAML 属性,这些属性为 sftpSessionFactory
作为主机、端口、用户、密码,sftpInboundAdapter
作为 remoteDirectory、remoteFilename、preserveTimestamp、localDirectory 等。该服务将处理 MyEvent
个实体。
这些是我的 SI 豆:
@Configuration
public class MySftpConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MySftpConfiguration.class);
@Bean
public SessionFactory<LsEntry> testSftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("testSftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(transferChannel.getHost());
sf.setPort(transferChannel.getPort());
sf.setUser(transferChannel.getUser());
sf.setPassword(transferChannel.getPassword());
sf.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow testSftpInboundFlow(TransferContext context) {
LOG.debug("testSftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(testSftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir())),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(5000))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
myService().myServiceMethod((myEvent[]) message.getPayload());
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
/* ... */
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
以下代码片段是执行所有操作的任务,首先启动 SI 集成流程以从 SFTP 服务器获取文件并处理其内容:
@Scheduled(cron = "${harry.potter.tasks.my-task.frequency}")
public void runAsTask() {
if (env.isEnabled() && myTaskEnv.isEnabled()) {
LOG.info("runAsTask");
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask env={}, myTask={}", env.isEnabled(), myTaskEnv.isEnabled());
LOG.debug("runAsTask {} not enabled: aborted.", jobId);
return;
}
try {
LOG.debug("runAsTask globally enabled={}", env.isEnabled());
/* ... */
List<MyEvent> myEvents = null;
try {
myEvents = getFile();
}
catch (Exception e) {
LOG.error("{} failed! Exception {}!!!", jobId, e.getClass().getName());
LOG.debug("... but continuing ...");
}
/* ... createEvents ... */
/* ... cleanData ... */
/* ... storeEvents ... */
/* ... cleanHistoricData ... */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
}
catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
}
finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
从 SFTP 获取文件最重要的部分是以下 getFile()
方法。它向 sftpInboundAdapter
发送 start
命令,然后启动计数为 1 的 CountDownLatch
以等待(最多 20 秒)获取远程文件。如果文件是在此超时期限内获取的,则其实体是 returned,否则 return (null):
编辑 1:
在开始 SftpInboundAdapter
之前添加 ChannelInterceptor
:
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
public List<MyEvent> getFile() {
LOG.debug("getFile");
final List<MyEvent> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}, message={}",
channel.toString(), sent, message.getPayload());
// read and transform the content
Arrays.stream((MyEvent[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout=30s");
if (latch.await(20, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
throw new IllegalStateException("expected file not available");
}
return null;
}
当它运行时,我的预期是 SI bean 首先在主线程中建立。然后另一个线程被Spring调度器启动来执行任务。当通过控制通道发送开始消息时,另一个线程将开始与 SFTP 服务器协商并传输文件,同时任务线程进入等待倒计时锁存器达到零的状态。
日志记录显示了另一种情况。通过控制通道发送启动命令对执行某项操作的线程数没有影响。所以任务线程进入等待latch的时候没有activity,但是没有人倒计时。因此,整整 20 秒过去了,getFile()
方法 return 没有数据。
任务线程一完成,JSCH 模块就开始与 SFTP 服务器协商。 文件的传输还不能工作,但目前应该没有兴趣。
这是日志记录:
2021-09-16 13:54:34.877 INFO [] --- [http-nio-8080-exec-55] com.harry.potter.Application : The following profiles are active: test
2021-09-16 13:54:37.129 INFO [] --- [http-nio-8080-exec-55] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-09-16 13:54:37.418 INFO [] --- [http-nio-8080-exec-55] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.handler.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:mySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606 INFO [] --- [http-nio-8080-exec-55] o.s.i.endpoint.EventDrivenConsumer : started bean 'mySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:54:37.614 INFO [] --- [http-nio-8080-exec-55] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-09-16 13:54:38.458 INFO [] --- [http-nio-8080-exec-55] c.l.job.MyApplication : Started MyApplication in 4.539 seconds (JVM running for 88524.557)
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask:
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : ---------------------------------------------------------------
2021-09-16 13:55:00.000 INFO [] --- [scheduling-1] job-MyEvents : Job started:
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask globally enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsKeepBackwards=12
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelStart=2
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : runAsTask monthsUpdateFrameRelEnd=null
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile
2021-09-16 13:55:00.023 INFO [] --- [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:00.024 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile 'Start' control message sent successfully=true
2021-09-16 13:55:00.025 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : getFile, waiting for the file to be received, timeout=20s
2021-09-16 13:55:20.026 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception java.lang.IllegalStateException!!!
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : ... but continuing ...
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData
2021-09-16 13:55:20.040 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : cleanData refDateFrom=2021-11-01, refDateTo=null
2021-09-16 13:55:20.042 ERROR [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents failed! Exception: {}
java.lang.NullPointerException: null
at com.harry.potter.job.MyEvents Task.cleanData(MyEvents Task.java:187)
at com.harry.potter.job.MyEvents Task.runAsTask(MyEvents Task.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:836)
2021-09-16 13:55:20.042 INFO [] --- [scheduling-1] job-MyEvents : Job has errors, send error mail
2021-09-16 13:55:20.042 DEBUG [] --- [scheduling-1] c.l.c.job.MyEvents Task : sendErrorMail
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] c.l.c.job.MyEvents Task : MyEvents finished
2021-09-16 13:55:20.192 INFO [] --- [scheduling-1] job-MyEvents : Job finished
2021-09-16 13:55:20.205 INFO [] --- [scheduling-1] com.jcraft.jsch : Connecting to myHost port 22
2021-09-16 13:55:20.207 INFO [] --- [scheduling-1] com.jcraft.jsch : Connection established
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Remote version string: SSH-2.0-OpenSSH_6.6.1p1 Debian-4~bpo70+1
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : Local version string: SSH-2.0-JSCH-0.1.54
2021-09-16 13:55:20.217 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-09-16 13:55:20.220 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2021-09-16 13:55:20.257 INFO [] --- [scheduling-1] com.jcraft.jsch : CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT sent
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXINIT received
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: ssh-rsa,ssh-ed25519
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client: none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client:
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: server->client aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.258 INFO [] --- [scheduling-1] com.jcraft.jsch : kex: client->server aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_KEXDH_INIT sent
2021-09-16 13:55:20.275 INFO [] --- [scheduling-1] com.jcraft.jsch : expecting SSH_MSG_KEXDH_REPLY
2021-09-16 13:55:20.296 INFO [] --- [scheduling-1] com.jcraft.jsch : ssh_rsa_verify: signature true
2021-09-16 13:55:20.298 INFO [] --- [scheduling-1] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'myHost ' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-09-16 13:55:20.298 WARN [] --- [scheduling-1] com.jcraft.jsch : Permanently added 'myHost ' (RSA) to the list of known hosts.
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS sent
2021-09-16 13:55:20.299 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_NEWKEYS received
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_REQUEST sent
2021-09-16 13:55:20.300 INFO [] --- [scheduling-1] com.jcraft.jsch : SSH_MSG_SERVICE_ACCEPT received
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: publickey,keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: publickey
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Authentications that can continue: keyboard-interactive,password
2021-09-16 13:55:20.307 INFO [] --- [scheduling-1] com.jcraft.jsch : Next authentication method: keyboard-interactive
2021-09-16 13:55:20.409 INFO [] --- [scheduling-1] com.jcraft.jsch : Disconnecting from myHost port 22
2021-09-16 13:55:20.419 ERROR [] --- [scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'data' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
...
Caused by: java.lang.IllegalStateException: failed to create SFTP Session
...
Caused by: java.lang.IllegalStateException: failed to connect
...
Caused by: com.jcraft.jsch.JSchException: Auth cancel
...
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.handler.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:MySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:55:28.855 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:MySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-09-16 13:55:28.856 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'MySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:55:28.857 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2021-09-16 13:55:28.858 INFO [] --- [Catalina-utility-2] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
我做错了什么?
既然你说这两个任务都在同一个线程上启动,那么看起来你处理的是最新的 Spring 引导:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.spring-integration。当 Spring 集成现在依赖于自动配置的 TaskScheduler
,它在其池中有一个线程。
您可以更改该配置,或者您可以将 task-executor
添加到 sftpInboundAdapter
入站通道适配器定义的 poller
:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#taskexecutor-support。这样,一项真正的工作将从调度程序线程转移到该执行程序提供的线程。