每次执行任务时如何强制 SftpInboundAdapter 连接并获取文件?

How do I force the SftpInboundAdapter to connect and get a file every time a task is executed?

这是我的场景:我每个月都会收到一个文件 my.sftp.server.de,事件数据为 JSON。该文件始终具有完全相同的名称。我知道什么时候提供文件。

然后文件将被提取、处理、删除并使用另一个显示处理时间戳的文件名进行备份。这是通过订阅的 MessageHandler deleteLocalFileService.

完成的

这是我的 Spring 集成 类:

@Autowired
private Executor taskExecutor;

private ConcurrentMap<String, String> metadata = new ConcurrentHashMap<>();

@Bean
public SessionFactory<LsEntry> sftpSessionFactory(TransferChannel transferChannel) {
    LOG.debug("sftpSessionFactory");
    DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
    ...
    return new CachingSessionFactory<LsEntry>(sf);
}

@Bean
public IntegrationFlow sftpInboundFlow(TransferContext context) {
    LOG.debug("sftpInboundFlow");
    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory(context.getChannel()))
                .preserveTimestamp(context.isPreserveTimestamp())
                .remoteDirectory(context.getRemoteDir())
                .regexFilter(context.getRemoteFilename())
                .deleteRemoteFiles(context.isRemoteRemove())
                .autoCreateLocalDirectory(context.isAutoCreateLocalDir())
                .localFilenameExpression(context.getLocalFilename())
                .localDirectory(new File(context.getLocalDir()))
                .localFilter(new FileSystemPersistentAcceptOnceFileListFilter(
                        new SimpleMetadataStore(metadata), "")),
            e -> e.id("sftpInboundAdapter")
                .autoStartup(false)
                .poller(Pollers.fixedDelay(15000).taskExecutor(taskExecutor))
            )
        .transform(Transformers.fromJson(Event[].class))
        .channel("sftpInputChannel")
        .get();
}

@Bean
public PublishSubscribeChannel sftpInputChannel() {
    LOG.debug("sftpInputChannel");
    return new PublishSubscribeChannel();
}

@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
    LOG.debug("MessageHandler-handler");
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOG.debug("handleMessage message={}", message);
        }
    };
}

@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
    LOG.debug("MessageHandler-deleteLocalFileService");
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            File f1 = (File) message.getHeaders().get("file_originalFile");
            try {
                // create backup copy in another directory
                // (otherwise the remote and local filters avoid fetch of the (new) file again)
                File f2 = new File("/home/app/sftp-inbound-work/EventData.txt.SAVE." +
                        LocalDateTime.now().format(ISO_LOCAL_DATE_TIME));
                FileUtil.copyFile(f1, f2);
                LOG.debug("deleteLocalFileService backup={} created", f2);

                // remove received file
                if (!f1.delete()) {
                    throw new MessagingException("Could not remove local file, " + f1.getName());
                };
                metadata.clear();
                LOG.debug("deleteLocalFileService file={} removed", f1);
            }
            catch (IOException io) {
                LOG.error("Exception copying local file {}", io);
                throw new MessagingException("Could not backup local file, " + f1.getName());
            }
        }
    };
}

@Bean
public MessageChannel controlChannel() {
    LOG.debug("controlChannel");
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
    LOG.debug("controlBus");
    ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
    return expressionControlBusFactoryBean;
}

因为我知道何时从外部源提供事件数据文件,所以我有一个方法 runAsTask 作为 Spring 调度程序任务执行。该方法添加一个 ChannelInterceptor 以获取文件的内容,然后通过 ControlChannel 消息启动 Spring 集成集成,最后从接收到的文件中获取和 return 事件数据。

这是核心任务方法:getFile() 方法:

private static final int FILE_GET_TIMEOUT_SEC = 30;

@Autowired
AbstractMessageChannel sftpInputChannel;

@Autowired
MessageChannel controlChannel;

@Scheduled(cron = "${com.harry.potter.job.tasks.harry-potter-events-task.frequency}")
public void runAsTask() {

    if (env.isEnabled()) {
        LOG.info("runAsTask: Storing Event Data, frequency={}", env.getFrequency());
        jobLOG.info("---------------------------------------------------------------");
        jobLOG.info("Job started: Storing Event Data");
    } else {
        LOG.debug("runAsTask");
        LOG.debug("runAsTask {} not enabled: Storing Event Data, aborted.", jobId);
        return;
    }

    try {
        events = getFile();

        /* processing of the event data */
        
        jobLOG.info("Okay, send okay mail");
        sendOkayMail();

    } catch (Exception e) {
        LOG.error("{} failed! Exception: {}", jobId, e);
        jobLOG.info("Job has errors, send error mail");
        sendErrorMail();
    } finally {
        LOG.info("{} finished", jobId);
        jobLOG.info("Job finished");
    }
}

public List<Event> getFile() {
    LOG.debug("getFile");

    final List<Event> events = new ArrayList<>();

    CountDownLatch latch = new CountDownLatch(1);

    sftpInputChannel.addInterceptor(new ChannelInterceptor() {
        // capture the message and count down the latch
        @Override
        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
            LOG.debug("postSend channel={}, sent={}", channel.toString(), sent);
            LOG.trace("postSend message={}", message.getPayload());

            // read and transform the content
            Arrays.stream((Event[]) message.getPayload()).forEach(e -> events.add(e));

            // signal: work has done
            latch.countDown();
            ChannelInterceptor.super.postSend(message, channel, sent);
        }
        
    });

    boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
    LOG.debug("getFile 'Start' control message sent successfully={}", sent);

    try {
        LOG.debug("getFile, waiting for the file to be received, timeout={}s", FILE_GET_TIMEOUT_SEC);
        if (latch.await(FILE_GET_TIMEOUT_SEC, TimeUnit.SECONDS)) {
            return events;
        }
    }
    catch (InterruptedException e) {
        LOG.warn("getFile, job was interrupted");
    }

    throw new IllegalStateException("expected file not available");
}

有两个问题:

  1. 多次添加ChannelInterceptor(见LOGpostSend channel=bean 'sftpInputChannel' ...)。我应该在哪里添加拦截器才能只有一个?或者如何检查是否已添加拦截器?

  2. 文件是第一次从 SFTP 获取(参见日志中的 com.jcraft.jsch 行)。所有后续任务周期都不再与 SFTP 主机进行协商。为了在每个周期获取文件,我被告知对由 Map 支持的 LocalDirectoryFilter 使用 SimpleMetadataStore 并在处理文件后清除此映射,我这样做了。每次任务 运行?

    时,我必须更改什么才能从 SFTP 获取文件

这里是两次启动任务的LOG:

2021-10-09 01:35:52.688  INFO [] --- [http-nio-8080-exec-220]  c.h.potter.job.MyJobApplication          : The following profiles are active: test,cdsclient
2021-10-09 01:35:53.960  WARN [] --- [http-nio-8080-exec-220]  o.s.boot.actuate.endpoint.EndpointId     : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2021-10-09 01:35:54.141  INFO [] --- [http-nio-8080-exec-220]  o.s.cloud.context.scope.GenericScope     : BeanFactory id=301a6a9f-3809-32e6-b15b-e640dd198297
2021-10-09 01:35:54.149  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-10-09 01:35:54.155  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-10-09 01:35:54.160  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-10-09 01:35:54.282  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.300  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.301  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.363  INFO [] --- [http-nio-8080-exec-220]  w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1668 ms
2021-10-09 01:35:54.967  WARN [] --- [http-nio-8080-exec-220]  c.n.c.sources.URLConfigurationSource     : No URLs will be polled as dynamic configuration sources.
2021-10-09 01:35:54.967  INFO [] --- [http-nio-8080-exec-220]  c.n.c.sources.URLConfigurationSource     : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-10-09 01:35:54.981  INFO [] --- [http-nio-8080-exec-220]  c.netflix.config.DynamicPropertyFactory  : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@db52778f
2021-10-09 01:35:55.158 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpInputChannel
2021-10-09 01:35:55.224 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : controlChannel
2021-10-09 01:35:55.476 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : transferChannel
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : transferContext
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpSessionFactory
2021-10-09 01:35:55.486 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpInboundFlow
2021-10-09 01:35:55.587 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : MessageHandler-handler
2021-10-09 01:35:55.588 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : MessageHandler-deleteLocalFileService
2021-10-09 01:35:55.589 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : controlBus
2021-10-09 01:35:56.271  INFO [] --- [http-nio-8080-exec-220]  o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.integration.channel.DirectChannel    : Channel 'application-1.sftpInboundFlow.channel#0' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:35:56.344  INFO [] --- [http-nio-8080-exec-220]  o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING
2021-10-09 01:35:56.380  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1
2021-10-09 01:35:56.449  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson
2021-10-09 01:35:56.449  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson
2021-10-09 01:35:56.550  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml
2021-10-09 01:35:56.550  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using XML decoding codec XStreamXml
2021-10-09 01:35:56.722  INFO [] --- [http-nio-8080-exec-220]  c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Disable delta property : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Single vip registry refresh property : null
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Force full registry fetch : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Application is null : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Registered Applications size is zero : true
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Application version is -1: true
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Getting all instance registry info from the eureka server
2021-10-09 01:35:56.953  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : The response status is 200
2021-10-09 01:35:56.955  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Not registering with Eureka server per configuration
2021-10-09 01:35:56.958  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1633736156957 with initial instances count: 69
2021-10-09 01:35:56.959  INFO [] --- [http-nio-8080-exec-220]  o.s.c.n.e.s.EurekaServiceRegistry        : Registering application MyJob with eureka with status UP
2021-10-09 01:35:56.986  INFO [] --- [http-nio-8080-exec-220]  c.h.potter.job.MyJobApplication          : Started MyJobApplication in 5.36 seconds (JVM running for 996008.867)
2021-10-09 01:36:00.000  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:00.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : ---------------------------------------------------------------
2021-10-09 01:36:00.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job started: Storing Event Data
2021-10-09 01:36:00.001 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile
2021-10-09 01:36:00.031  INFO [] --- [pool-240-thread-1]  o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:00.055  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Connecting to my.sftp.server.de     port 22
2021-10-09 01:36:00.057  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Connection established
...
2021-10-09 01:36:00.158  INFO [] --- [pool-240-thread-2]  o.s.i.s.s.DefaultSftpSessionFactory      : The authenticity of host 'my.sftp.server.de' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-10-09 01:36:00.158  WARN [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Permanently added 'my.sftp.server.de' (RSA) to the list of known hosts.
...
2021-10-09 01:36:00.385  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Authentication succeeded (publickey).
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.c.c.sftpConfiguration              : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@24b4484b, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=fa7b1773-d522-9f0f-e8ed-8fcf3bebe23a, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736160447}]
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.c.c.sftpConfiguration              : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:00.448 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : createEvents
2021-10-09 01:36:01.384  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Okay, send okay mail
2021-10-09 01:36:01.384 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : sendOkayMail
2021-10-09 01:36:01.512  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : MyJob finished
2021-10-09 01:36:01.512  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job finished
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : ---------------------------------------------------------------
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job started: Storing Event Data
2021-10-09 01:36:30.001 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.c.c.sftpConfiguration              : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@b2cd829f, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=e6558e48-db84-fd2f-2195-c43d3e245127, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736190062}]
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.c.c.sftpConfiguration              : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : createEvents
2021-10-09 01:36:30.394  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Okay, send okay mail
2021-10-09 01:36:30.395 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : sendOkayMail
2021-10-09 01:36:30.457  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : MyJob finished
2021-10-09 01:36:30.457  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job finished
...
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.sftpInboundFlow.channel#0' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
  1. 因为你需要一个倒计时锁存器,你每次都需要一个新的拦截器(或者每次都使用一个带有新锁存器的AtomicReference)。你可以打电话给removeInterceptor.

  2. 还有一个远程过滤器。

但是,对于这种情况,使用带 GET 命令的出站网关比使用异步入站通道适配器要容易得多。

https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway

编辑

示例:

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
        sf.setPassword(pw);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return IntegrationFlows.from(Gate.class)
                .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                        .localDirectoryExpression("'/tmp'"))
                .transform(new FileToStringTransformer())
                .transform(Transformers.fromJson(Foo.class))
                .get();
    }

}

interface Gate {

    Foo getFoo(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
    }

}

EDIT2

使用 .route 获取和删除的示例:

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw, @Value("${pk}") Resource pk) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
//      sf.setPassword(pw);
        sf.setPrivateKey(pk);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return f -> f
                .log()
                .route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
                        .subFlowMapping("getFoo", f1 -> f1
                            .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                                    .localDirectoryExpression("'/tmp'"))
                            .transform(new FileToStringTransformer())
                            .transform(Transformers.fromJson(Foo.class)))
                        .subFlowMapping("remove", f2 -> f2
                                .handle(Sftp.outboundGateway(sf, Command.RM, "payload"))));
    }

}

@MessagingGateway(defaultRequestChannel = "flow.input",
        defaultHeaders = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
interface Gate {

    Foo getFoo(String filename);

    boolean remove(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
        log.info("Deleted {}", this.gate.remove("foo/bar.json"));
    }

}

EDIT3

以上示例仅使用注释定义了网关代理 - 您不能在注释和 DSL 之间混合配置。下面显示了仅使用 DSL 的网关代理配置的相同内容。

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw, @Value("${pk}") Resource pk) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
//      sf.setPassword(pw);
        sf.setPrivateKey(pk);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return IntegrationFlows.from(Gate.class, g -> g
                        .header("method", args -> args.getMethod().getName()))
                .log()
                .route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
                        .subFlowMapping("getFoo", f1 -> f1
                            .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                                    .localDirectoryExpression("'/tmp'"))
                            .transform(new FileToStringTransformer())
                            .transform(Transformers.fromJson(Foo.class)))
                        .subFlowMapping("remove", f2 -> f2
                                .handle(Sftp.outboundGateway(sf, Command.RM, "payload"))))
                .get();
    }

}

interface Gate {

    Foo getFoo(String filename);

    boolean remove(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
        log.info("Deleted {}", this.gate.remove("foo/bar.json"));
    }

}