如何使用 Spring 与 Java 配置的集成每天通过 SFTP 获取文件?
How to get a file daily via SFTP using Spring Integration with Java config?
我需要每天通过 SFTP 获取一个文件。我想使用 Spring 与 Java 配置的集成。该文件通常在每天的特定时间可用。应用程序应尝试每天在该时间附近获取文件。如果文件不可用,它应该继续重试 x 次。在 x 次尝试后,它应该发送一封电子邮件让管理员知道该文件在 SFTP 站点上仍然不可用。
一种选择是使用 SftpInboundFileSynchronizingMessageSource
。在 MessageHandler
,我可以开始处理文件的工作。但是,我真的不需要与远程文件系统同步。毕竟是文件的预定交付。另外,我需要为下一次重试最多延迟 15 分钟,并且每 15 分钟轮询一次对于每日文件来说似乎有点过分了。我想我可以使用它,但需要一些机制在经过一定时间后发送电子邮件并且没有收到文件。
另一个选项似乎是使用 SFTP 出站网关的 get
。但我能找到的唯一示例似乎是 XML 配置。
更新
使用以下 Artem Bilan 的回答提供的帮助后添加代码:
配置class:
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource(ApplicationProperties applicationProperties, PropertiesPersistingMetadataStore store) {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer(applicationProperties));
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
FileSystemPersistentAcceptOnceFileListFilter local = new FileSystemPersistentAcceptOnceFileListFilter(store,"test");
source.setLocalFilter(local);
source.setCountsEnabled(true);
return source;
}
@Bean
public PollerMetadata pollerMetadata() {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice());
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
return pollerMetadata;
}
@Bean
public RetryCompoundTriggerAdvice retryCompoundTriggerAdvice() {
return new RetryCompoundTriggerAdvice(compoundTrigger(), secondaryTrigger());
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public Trigger primaryTrigger() {
return new CronTrigger("*/60 * * * * *");
}
@Bean
public Trigger secondaryTrigger() {
return new PeriodicTrigger(10000);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler(PropertiesPersistingMetadataStore store) {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
store.flush();
}
};
}
RetryCompoundTriggerAdvice class:
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private int count = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
this.compoundTrigger = compoundTrigger;
this.override = overrideTrigger;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null && count <= 5) {
count++;
this.compoundTrigger.setOverride(this.override);
}
else {
this.compoundTrigger.setOverride(null);
if (count > 5) {
//send email
}
count = 0;
}
return result;
}
}
自 Spring Integration 4.3 以来 CompoundTrigger
:
* A {@link Trigger} that delegates the {@link #nextExecutionTime(TriggerContext)}
* to one of two Triggers. If the {@link #setOverride(Trigger) override} trigger is
* {@code null}, the primary trigger is invoked; otherwise the override trigger is
* invoked.
与CompoundTriggerAdvice
的组合:
* An {@link AbstractMessageSourceAdvice} that uses a {@link CompoundTrigger} to adjust
* the poller - when a message is present, the compound trigger's primary trigger is
* used to determine the next poll. When no message is present, the override trigger is
* used.
它可以用来完成你的任务:
primaryTrigger
每天只能做一次CronTrigger
到运行的任务。
override
可能是 PeriodicTrigger
,需要短时间重试。
您可以将 retry
逻辑与另一个 Advice
一起用于轮询器,或者只是扩展 CompoundTriggerAdvice
以添加 count
逻辑以最终发送电子邮件。
由于没有文件,所以没有消息踢流。我们别无选择,除非绕过轮询器基础设施。
我需要每天通过 SFTP 获取一个文件。我想使用 Spring 与 Java 配置的集成。该文件通常在每天的特定时间可用。应用程序应尝试每天在该时间附近获取文件。如果文件不可用,它应该继续重试 x 次。在 x 次尝试后,它应该发送一封电子邮件让管理员知道该文件在 SFTP 站点上仍然不可用。
一种选择是使用 SftpInboundFileSynchronizingMessageSource
。在 MessageHandler
,我可以开始处理文件的工作。但是,我真的不需要与远程文件系统同步。毕竟是文件的预定交付。另外,我需要为下一次重试最多延迟 15 分钟,并且每 15 分钟轮询一次对于每日文件来说似乎有点过分了。我想我可以使用它,但需要一些机制在经过一定时间后发送电子邮件并且没有收到文件。
另一个选项似乎是使用 SFTP 出站网关的 get
。但我能找到的唯一示例似乎是 XML 配置。
更新
使用以下 Artem Bilan 的回答提供的帮助后添加代码:
配置class:
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource(ApplicationProperties applicationProperties, PropertiesPersistingMetadataStore store) {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer(applicationProperties));
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
FileSystemPersistentAcceptOnceFileListFilter local = new FileSystemPersistentAcceptOnceFileListFilter(store,"test");
source.setLocalFilter(local);
source.setCountsEnabled(true);
return source;
}
@Bean
public PollerMetadata pollerMetadata() {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice());
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
return pollerMetadata;
}
@Bean
public RetryCompoundTriggerAdvice retryCompoundTriggerAdvice() {
return new RetryCompoundTriggerAdvice(compoundTrigger(), secondaryTrigger());
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public Trigger primaryTrigger() {
return new CronTrigger("*/60 * * * * *");
}
@Bean
public Trigger secondaryTrigger() {
return new PeriodicTrigger(10000);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler(PropertiesPersistingMetadataStore store) {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
store.flush();
}
};
}
RetryCompoundTriggerAdvice class:
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private int count = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
this.compoundTrigger = compoundTrigger;
this.override = overrideTrigger;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null && count <= 5) {
count++;
this.compoundTrigger.setOverride(this.override);
}
else {
this.compoundTrigger.setOverride(null);
if (count > 5) {
//send email
}
count = 0;
}
return result;
}
}
自 Spring Integration 4.3 以来 CompoundTrigger
:
* A {@link Trigger} that delegates the {@link #nextExecutionTime(TriggerContext)}
* to one of two Triggers. If the {@link #setOverride(Trigger) override} trigger is
* {@code null}, the primary trigger is invoked; otherwise the override trigger is
* invoked.
与CompoundTriggerAdvice
的组合:
* An {@link AbstractMessageSourceAdvice} that uses a {@link CompoundTrigger} to adjust
* the poller - when a message is present, the compound trigger's primary trigger is
* used to determine the next poll. When no message is present, the override trigger is
* used.
它可以用来完成你的任务:
primaryTrigger
每天只能做一次CronTrigger
到运行的任务。
override
可能是 PeriodicTrigger
,需要短时间重试。
您可以将 retry
逻辑与另一个 Advice
一起用于轮询器,或者只是扩展 CompoundTriggerAdvice
以添加 count
逻辑以最终发送电子邮件。
由于没有文件,所以没有消息踢流。我们别无选择,除非绕过轮询器基础设施。