为什么 SFTP 出站网关在我启动其集成流程后没有立即开始工作?

Why does the SFTP Outbound Gateway not start working as soon as I start its Integration Flow?

我的场景是我有一个 Spring 集成流和一个 SftpInboundAdapter,它将从 SftpServer“myHost”获取一个文件。该文件包含 JSON,它将被转换为 MyEvent 实体以转发给进一步处理。该过程在 Spring 调度程序调度的任务中实现。因此,集成流程不会随着具有 autoStartup(false).

的应用程序自动启动

Spring 集成流程是:

类型 TransferContextTransferChannelMyService 是我的 Java 类,其中一些字段来自 YAML 属性,这些属性为 sftpSessionFactory 作为主机、端口、用户、密码,sftpInboundAdapter 作为 remoteDirectory、remoteFilename、preserveTimestamp、localDirectory 等。该服务将处理 MyEvent 个实体。

这些是我的 SI 豆:

@Configuration
public class MySftpConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(MySftpConfiguration.class);

    @Bean
    public SessionFactory<LsEntry> testSftpSessionFactory(TransferChannel transferChannel) {
        LOG.debug("testSftpSessionFactory");
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(transferChannel.getHost());
        sf.setPort(transferChannel.getPort());
        sf.setUser(transferChannel.getUser());
        sf.setPassword(transferChannel.getPassword());
        sf.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public IntegrationFlow testSftpInboundFlow(TransferContext context) {
        LOG.debug("testSftpInboundFlow");
        return IntegrationFlows
            .from(Sftp.inboundAdapter(testSftpSessionFactory(context.getChannel()))
                    .preserveTimestamp(context.isPreserveTimestamp())
                    .remoteDirectory(context.getRemoteDir())
                    .regexFilter(context.getRemoteFilename())
                    .deleteRemoteFiles(context.isRemoteRemove())
                    .autoCreateLocalDirectory(context.isAutoCreateLocalDir())
                    .localFilenameExpression(context.getLocalFilename())
                    .localDirectory(new File(context.getLocalDir())),
                e -> e.id("sftpInboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.fixedDelay(5000))
                )
            .transform(Transformers.fromJson(Event[].class))
            .channel("sftpInputChannel")
            .get();
    }

    @Bean
    public PublishSubscribeChannel sftpInputChannel() {
        LOG.debug("sftpInputChannel");
        return new PublishSubscribeChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpInputChannel")
    public MessageHandler handler() {
        LOG.debug("MessageHandler-handler");
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                LOG.debug("handleMessage message={}", message);
                myService().myServiceMethod((myEvent[]) message.getPayload());
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpInputChannel")
    public MessageHandler deleteLocalFileService() {
        /* ... */
    }

    @Bean
    public MessageChannel controlChannel() {
        LOG.debug("controlChannel");
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    public ExpressionControlBusFactoryBean controlBus() {
        LOG.debug("controlBus");
        ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
        return expressionControlBusFactoryBean;
    }

以下代码片段是执行所有操作的任务,首先启动 SI 集成流程以从 SFTP 服务器获取文件并处理其内容:

    @Scheduled(cron = "${harry.potter.tasks.my-task.frequency}")
    public void runAsTask() {

        if (env.isEnabled() && myTaskEnv.isEnabled()) {
            LOG.info("runAsTask");
            jobLOG.info("---------------------------------------------------------------");
            jobLOG.info("Job started: Storing Event Data");
        } else {
            LOG.debug("runAsTask env={}, myTask={}", env.isEnabled(), myTaskEnv.isEnabled());
            LOG.debug("runAsTask {} not enabled: aborted.", jobId);
            return;
        }

        try {

            LOG.debug("runAsTask globally enabled={}", env.isEnabled());
            /* ... */
    
            List<MyEvent> myEvents = null;
            try {           
                myEvents = getFile();
            }
            catch (Exception e) {
                LOG.error("{} failed! Exception {}!!!", jobId, e.getClass().getName());
                LOG.debug("... but continuing ...");
            }

            /* ... createEvents ... */

            /* ... cleanData ... */

            /* ... storeEvents ... */

            /* ... cleanHistoricData ... */

            jobLOG.info("Okay, send okay mail");
            sendOkayMail();

        }
        catch (Exception e) {
            LOG.error("{} failed! Exception: {}", jobId, e);
            jobLOG.info("Job has errors, send error mail");
            sendErrorMail();
        }
        finally {
            LOG.info("{} finished", jobId);
            jobLOG.info("Job finished");
        }
    }

从 SFTP 获取文件最重要的部分是以下 getFile() 方法。它向 sftpInboundAdapter 发送 start 命令,然后启动计数为 1 的 CountDownLatch 以等待(最多 20 秒)获取远程文件。如果文件是在此超时期限内获取的,则其实体是 returned,否则 return (null):

没有任何内容

编辑 1:

在开始 SftpInboundAdapter 之前添加 ChannelInterceptor:

@Autowired
AbstractMessageChannel sftpInputChannel;

@Autowired
MessageChannel controlChannel;

public List<MyEvent> getFile() {
        LOG.debug("getFile");

        final List<MyEvent> events = new ArrayList<>();

        CountDownLatch latch = new CountDownLatch(1);

        sftpInputChannel.addInterceptor(new ChannelInterceptor() {
            // capture the message and count down the latch
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                LOG.debug("postSend channel={}, sent={}, message={}",
                        channel.toString(), sent, message.getPayload());

                // read and transform the content
                Arrays.stream((MyEvent[]) message.getPayload()).forEach(e -> events.add(e));

                // signal: work has done
                latch.countDown();
                ChannelInterceptor.super.postSend(message, channel, sent);
            }
            
        });

        boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
        LOG.debug("getFile 'Start' control message sent successfully={}", sent);

        try {
            LOG.debug("getFile, waiting for the file to be received, timeout=30s");
            if (latch.await(20, TimeUnit.SECONDS)) {
                return events;
            }
        }
        catch (InterruptedException e) {
            LOG.warn("getFile, job was interrupted");
            throw new IllegalStateException("expected file not available");
        }

        return null;
    }

当它运行时,我的预期是 SI bean 首先在主线程中建立。然后另一个线程被Spring调度器启动来执行任务。当通过控制通道发送开始消息时,另一个线程将开始与 SFTP 服务器协商并传输文件,同时任务线程进入等待倒计时锁存器达到零的状态。

日志记录显示了另一种情况。通过控制通道发送启动命令对执行某项操作的线程数没有影响。所以任务线程进入等待latch的时候没有activity,但是没有人倒计时。因此,整整 20 秒过去了,getFile() 方法 return 没有数据。

任务线程一完成,JSCH 模块就开始与 SFTP 服务器协商。 文件的传输还不能工作,但目前应该没有兴趣。

这是日志记录:

2021-09-16 13:54:34.877  INFO [] --- [http-nio-8080-exec-55] com.harry.potter.Application         : The following profiles are active: test
2021-09-16 13:54:37.129  INFO [] --- [http-nio-8080-exec-55]  o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-09-16 13:54:37.418  INFO [] --- [http-nio-8080-exec-55]  o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.integration.channel.DirectChannel    : Channel 'application-1.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.handler.serviceActivator'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : Adding {service-activator:mySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-09-16 13:54:37.606  INFO [] --- [http-nio-8080-exec-55]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'mySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:54:37.614  INFO [] --- [http-nio-8080-exec-55]  o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING
2021-09-16 13:54:38.458  INFO [] --- [http-nio-8080-exec-55]  c.l.job.MyApplication         : Started MyApplication in 4.539 seconds (JVM running for 88524.557)
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask: 
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  job-MyEvents                             : ---------------------------------------------------------------
2021-09-16 13:55:00.000  INFO [] --- [scheduling-1]  job-MyEvents                             : Job started: 
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask globally enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask enabled=true
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsKeepBackwards=12
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsUpdateFrameRelStart=2
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : runAsTask monthsUpdateFrameRelEnd=null
2021-09-16 13:55:00.000 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile
2021-09-16 13:55:00.023  INFO [] --- [scheduling-1]  o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:00.024 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile 'Start' control message sent successfully=true
2021-09-16 13:55:00.025 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : getFile, waiting for the file to be received, timeout=20s
2021-09-16 13:55:20.026 ERROR [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          failed! Exception java.lang.IllegalStateException!!!
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : ... but continuing ...
2021-09-16 13:55:20.026 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : cleanData
2021-09-16 13:55:20.040 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : cleanData refDateFrom=2021-11-01, refDateTo=null
2021-09-16 13:55:20.042 ERROR [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          failed! Exception: {}

java.lang.NullPointerException: null
    at com.harry.potter.job.MyEvents                Task.cleanData(MyEvents         Task.java:187)
    at com.harry.potter.job.MyEvents                Task.runAsTask(MyEvents         Task.java:99)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:836)

2021-09-16 13:55:20.042  INFO [] --- [scheduling-1]  job-MyEvents                             : Job has errors, send error mail
2021-09-16 13:55:20.042 DEBUG [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : sendErrorMail
2021-09-16 13:55:20.192  INFO [] --- [scheduling-1]  c.l.c.job.MyEvents                Task   : MyEvents          finished
2021-09-16 13:55:20.192  INFO [] --- [scheduling-1]  job-MyEvents                             : Job finished
2021-09-16 13:55:20.205  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Connecting to myHost   port 22
2021-09-16 13:55:20.207  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Connection established
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Remote version string: SSH-2.0-OpenSSH_6.6.1p1 Debian-4~bpo70+1
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Local version string: SSH-2.0-JSCH-0.1.54
2021-09-16 13:55:20.217  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-09-16 13:55:20.220  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2021-09-16 13:55:20.257  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXINIT sent
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXINIT received
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: ssh-rsa,ssh-ed25519
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com,aes256-ctr,aes192-ctr,aes128-ctr,aes128-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-512,hmac-sha2-256,hmac-ripemd160,umac-128@openssh.com,hmac-sha1-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: none,zlib@openssh.com
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha256,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-ctr,aes192-cbc,aes256-ctr,aes256-cbc
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client: 
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: server->client aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.258  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : kex: client->server aes128-ctr hmac-sha2-256 none
2021-09-16 13:55:20.275  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_KEXDH_INIT sent
2021-09-16 13:55:20.275  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : expecting SSH_MSG_KEXDH_REPLY
2021-09-16 13:55:20.296  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : ssh_rsa_verify: signature true
2021-09-16 13:55:20.298  INFO [] --- [scheduling-1]  o.s.i.s.s.DefaultSftpSessionFactory      : The authenticity of host 'myHost  ' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-09-16 13:55:20.298  WARN [] --- [scheduling-1]  com.jcraft.jsch                          : Permanently added 'myHost  ' (RSA) to the list of known hosts.
2021-09-16 13:55:20.299  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_NEWKEYS sent
2021-09-16 13:55:20.299  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_NEWKEYS received
2021-09-16 13:55:20.300  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_SERVICE_REQUEST sent
2021-09-16 13:55:20.300  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : SSH_MSG_SERVICE_ACCEPT received
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Authentications that can continue: publickey,keyboard-interactive,password
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Next authentication method: publickey
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Authentications that can continue: keyboard-interactive,password
2021-09-16 13:55:20.307  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Next authentication method: keyboard-interactive
2021-09-16 13:55:20.409  INFO [] --- [scheduling-1]  com.jcraft.jsch                          : Disconnecting from myHost   port 22
2021-09-16 13:55:20.419 ERROR [] --- [scheduling-1]  o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'data' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Failed to obtain pooled item
    ...
Caused by: java.lang.IllegalStateException: failed to create SFTP Session
    ...
   Caused by: java.lang.IllegalStateException: failed to connect
    ...
   Caused by: com.jcraft.jsch.JSchException: Auth cancel
    ...
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [job/config/MySftpConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:MySftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.handler.serviceActivator'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:MySftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.deleteLocalFileService.serviceActivator'
2021-09-16 13:55:28.855  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {service-activator:MySftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-09-16 13:55:28.856  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-09-16 13:55:28.856  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'MySftpConfiguration.controlBus.serviceActivator'
2021-09-16 13:55:28.857  INFO [] --- [Catalina-utility-2]  o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2021-09-16 13:55:28.858  INFO [] --- [Catalina-utility-2]  o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

我做错了什么?

既然你说这两个任务都在同一个线程上启动,那么看起来你处理的是最新的 Spring 引导:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.spring-integration。当 Spring 集成现在依赖于自动配置的 TaskScheduler,它在其池中有一个线程。

您可以更改该配置,或者您可以将 task-executor 添加到 sftpInboundAdapter 入站通道适配器定义的 pollerhttps://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#taskexecutor-support。这样,一项真正的工作将从调度程序线程转移到该执行程序提供的线程。