如何使用 Spring 与 Java 配置的集成每天通过 SFTP 获取文件?

How to get a file daily via SFTP using Spring Integration with Java config?

我需要每天通过 SFTP 获取一个文件。我想使用 Spring 与 Java 配置的集成。该文件通常在每天的特定时间可用。应用程序应尝试每天在该时间附近获取文件。如果文件不可用,它应该继续重试 x 次。在 x 次尝试后,它应该发送一封电子邮件让管理员知道该文件在 SFTP 站点上仍然不可用。

一种选择是使用 SftpInboundFileSynchronizingMessageSource。在 MessageHandler,我可以开始处理文件的工作。但是,我真的不需要与远程文件系统同步。毕竟是文件的预定交付。另外,我需要为下一次重试最多延迟 15 分钟,并且每 15 分钟轮询一次对于每日文件来说似乎有点过分了。我想我可以使用它,但需要一些机制在经过一定时间后发送电子邮件并且没有收到文件。

另一个选项似乎是使用 SFTP 出站网关的 get。但我能找到的唯一示例似乎是 XML 配置。

更新

使用以下 Artem Bilan 的回答提供的帮助后添加代码:

配置class:

@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource(ApplicationProperties applicationProperties, PropertiesPersistingMetadataStore store) {
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer(applicationProperties));
    source.setLocalDirectory(new File("ftp-inbound"));
    source.setAutoCreateLocalDirectory(true);
    FileSystemPersistentAcceptOnceFileListFilter local = new FileSystemPersistentAcceptOnceFileListFilter(store,"test");
    source.setLocalFilter(local);
    source.setCountsEnabled(true);        
    return source;
}

@Bean
public PollerMetadata pollerMetadata() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    List<Advice> adviceChain = new ArrayList<Advice>();
    adviceChain.add(retryCompoundTriggerAdvice());
    pollerMetadata.setAdviceChain(adviceChain);
    pollerMetadata.setTrigger(compoundTrigger());
    return pollerMetadata;
}

@Bean
public RetryCompoundTriggerAdvice retryCompoundTriggerAdvice() {
    return new RetryCompoundTriggerAdvice(compoundTrigger(), secondaryTrigger());
}

@Bean
public CompoundTrigger compoundTrigger() {
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
    return compoundTrigger;
}

@Bean
public Trigger primaryTrigger() {
    return new CronTrigger("*/60 * * * * *");
}

@Bean
public Trigger secondaryTrigger() {
    return new PeriodicTrigger(10000);
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler(PropertiesPersistingMetadataStore store) {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
            store.flush();
        }

    };
}

RetryCompoundTriggerAdvice class:

public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {

    private final CompoundTrigger compoundTrigger;

    private final Trigger override;

    private int count = 0;

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
        Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
        this.compoundTrigger = compoundTrigger;
        this.override = overrideTrigger;
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return true;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        if (result == null && count <= 5) {
            count++;
            this.compoundTrigger.setOverride(this.override);
        }
        else {
            this.compoundTrigger.setOverride(null);
            if (count > 5) {
                 //send email
            }
            count = 0;
        }
        return result;
    }
}

自 Spring Integration 4.3 以来 CompoundTrigger:

* A {@link Trigger} that delegates the {@link #nextExecutionTime(TriggerContext)}
* to one of two Triggers. If the {@link #setOverride(Trigger) override} trigger is
* {@code null}, the primary trigger is invoked; otherwise the override trigger is
* invoked.

CompoundTriggerAdvice的组合:

* An {@link AbstractMessageSourceAdvice} that uses a {@link CompoundTrigger} to adjust
* the poller - when a message is present, the compound trigger's primary trigger is
* used to determine the next poll. When no message is present, the override trigger is
* used.

它可以用来完成你的任务:

primaryTrigger每天只能做一次CronTrigger到运行的任务。

override 可能是 PeriodicTrigger,需要短时间重试。

您可以将 retry 逻辑与另一个 Advice 一起用于轮询器,或者只是扩展 CompoundTriggerAdvice 以添加 count 逻辑以最终发送电子邮件。

由于没有文件,所以没有消息踢流。我们别无选择,除非绕过轮询器基础设施。