如何在会话超时后使用 JSch 和 Spring 集成进行 SFTP
How to SFTP after session timeout using JSch and Spring Integration
我有一个通过 SftpInboundFileSynchronizer
:
SFTP 文件的应用程序
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory());
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>();
compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp"));
compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern()));
fileSynchronizer.setFilter(compositeFileListFilter);
fileSynchronizer.setPreserveTimestamp(true);
return fileSynchronizer;
}
会话工厂是:
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost(applicationProperties.getSftpHost());
sftpSessionFactory.setPort(applicationProperties.getSftpPort());
sftpSessionFactory.setUser(applicationProperties.getSftpUser());
sftpSessionFactory.setPassword(applicationProperties.getSftpPassword());
sftpSessionFactory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sftpSessionFactory);
}
SftpInboundFileSynchronizingMessageSource
设置为使用复合触发器进行轮询。
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory());
source.setAutoCreateLocalDirectory(true);
CompositeFileListFilter<File> compositeFileFilter = new CompositeFileListFilter<File>();
compositeFileFilter.addFilter(new LastModifiedFileListFilter());
compositeFileFilter.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem"));
source.setLocalFilter(compositeFileFilter);
source.setCountsEnabled(true);
return source;
}
轮询器设置有复合触发器。它有一个 cron 触发器来尝试每天一次通过 SFTP 传输预期的文件。如果没有找到预期的文件,我有一个覆盖 cron 触发器的周期性触发器。它每 15 分钟尝试一次 SFTP 文件(最多 5 次)。在获取文件或尝试获取文件 5 次后,cron 触发器将覆盖周期性触发器。 RetryCompoundTriggerAdvice
包含此逻辑。
@Bean
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice);
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
pollerMetadata.setMaxMessagesPerPoll(1);
return pollerMetadata;
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public CronTrigger primaryTrigger() {
return new CronTrigger(applicationProperties.getSchedule());
}
@Bean
public PeriodicTrigger secondaryTrigger() {
return new PeriodicTrigger(applicationProperties.getRetryInterval());
}
RetryCompoundTriggerAdvice
@Component
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class);
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private final ApplicationProperties applicationProperties;
private final Mail mail;
private int attempts = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger,
@Qualifier("secondaryTrigger") Trigger override,
ApplicationProperties applicationProperties,
Mail mail) {
this.compoundTrigger = compoundTrigger;
this.override = override;
this.applicationProperties = applicationProperties;
this.mail = mail;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
final int maxOverrideAttempts = applicationProperties.getMaxFileRetry();
attempts++;
if (result == null && attempts < maxOverrideAttempts) {
logger.info("Unable to find load file after " + attempts + " attempt(s). Will reattempt");
this.compoundTrigger.setOverride(this.override);
} else if (result == null && attempts >= maxOverrideAttempts) {
mail.sendAdminsEmail("Missing File");
attempts = 0;
this.compoundTrigger.setOverride(null);
}
else {
attempts = 0;
this.compoundTrigger.setOverride(null);
logger.info("Found load file");
}
return result;
}
public void setOverrideTrigger() {
this.compoundTrigger.setOverride(this.override);
}
public CompoundTrigger getCompoundTrigger() {
return compoundTrigger;
}
}
问题是当应用程序在 15 分钟后重新尝试 SFTP 时,服务器会话已超时。因此,JSch 报告:
2017-01-27 18:30:01.447 INFO 14248 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Caught an exception, leaving main loop due to End of IO Stream Read
2017-01-27 18:30:01.448 INFO 14248 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Disconnecting from my.sftp.com port 22
因此,在下一个 cron 时间(即第二天)之前,不会再收到任何消息,并且应用程序不会尝试 SFTP 文件。等待 15 分钟后如何重新尝试 SFTP 文件?
我尝试在 DefaultSftpSessionFactory
上调用 setServerAliveInterval
,但没有成功。注意:在检索文件或尝试 5 次后,我想停止与 SFTP 服务器的通信,直到下一个 cron 运行.
更新
我终于能够执行测试了。根据要求添加调试日志。注意:调试日志在 prop 文件中设置如下:
logging.level.org.springframework: DEBUG
logging.level.com.jcraft.jsch: DEBUG
这是日志:
2017-02-15 18:15:56.206 DEBUG 26748 --- [task-scheduler-9] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:16:56.211 DEBUG 26748 --- [task-scheduler-9] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:17:56.213 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:18:56.214 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:19:56.215 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:20:56.215 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:21:56.217 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:22:56.218 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:23:56.219 DEBUG 26748 --- [task-scheduler-3] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:24:56.221 DEBUG 26748 --- [task-scheduler-2] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:25:56.222 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:26:56.223 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:27:56.224 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:28:56.225 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:29:56.226 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:30:03.884 INFO 26748 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Caught an exception, leaving main
loop due to End of IO Stream Read
2017-02-15 18:30:03.884 INFO 26748 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Disconnecting from my.sftp.com port 22
2017-02-15 18:30:56.227 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:31:56.228 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:32:56.228 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:33:56.230 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
即使我设置了调试级别,JSCH 也不会记录调试语句。从日志来看,SFTP 连接似乎已终止。注意:由于文件系统同步的另一个轮询器,task-scheduler-6 线程每分钟轮询一次。
缓存应该检测到会话不再有效并分发一个新会话。我需要查看 DEBUG 日志(针对 spring 和 jsch)以了解发生了什么。
但是,如果你每天只运行一次,真的没有必要缓存会话,直接使用会话工厂,你会在每次轮询时得到一个新会话。
我有一个通过 SftpInboundFileSynchronizer
:
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory());
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>();
compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp"));
compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern()));
fileSynchronizer.setFilter(compositeFileListFilter);
fileSynchronizer.setPreserveTimestamp(true);
return fileSynchronizer;
}
会话工厂是:
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost(applicationProperties.getSftpHost());
sftpSessionFactory.setPort(applicationProperties.getSftpPort());
sftpSessionFactory.setUser(applicationProperties.getSftpUser());
sftpSessionFactory.setPassword(applicationProperties.getSftpPassword());
sftpSessionFactory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sftpSessionFactory);
}
SftpInboundFileSynchronizingMessageSource
设置为使用复合触发器进行轮询。
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory());
source.setAutoCreateLocalDirectory(true);
CompositeFileListFilter<File> compositeFileFilter = new CompositeFileListFilter<File>();
compositeFileFilter.addFilter(new LastModifiedFileListFilter());
compositeFileFilter.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem"));
source.setLocalFilter(compositeFileFilter);
source.setCountsEnabled(true);
return source;
}
轮询器设置有复合触发器。它有一个 cron 触发器来尝试每天一次通过 SFTP 传输预期的文件。如果没有找到预期的文件,我有一个覆盖 cron 触发器的周期性触发器。它每 15 分钟尝试一次 SFTP 文件(最多 5 次)。在获取文件或尝试获取文件 5 次后,cron 触发器将覆盖周期性触发器。 RetryCompoundTriggerAdvice
包含此逻辑。
@Bean
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice);
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
pollerMetadata.setMaxMessagesPerPoll(1);
return pollerMetadata;
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public CronTrigger primaryTrigger() {
return new CronTrigger(applicationProperties.getSchedule());
}
@Bean
public PeriodicTrigger secondaryTrigger() {
return new PeriodicTrigger(applicationProperties.getRetryInterval());
}
RetryCompoundTriggerAdvice
@Component
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class);
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private final ApplicationProperties applicationProperties;
private final Mail mail;
private int attempts = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger,
@Qualifier("secondaryTrigger") Trigger override,
ApplicationProperties applicationProperties,
Mail mail) {
this.compoundTrigger = compoundTrigger;
this.override = override;
this.applicationProperties = applicationProperties;
this.mail = mail;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
final int maxOverrideAttempts = applicationProperties.getMaxFileRetry();
attempts++;
if (result == null && attempts < maxOverrideAttempts) {
logger.info("Unable to find load file after " + attempts + " attempt(s). Will reattempt");
this.compoundTrigger.setOverride(this.override);
} else if (result == null && attempts >= maxOverrideAttempts) {
mail.sendAdminsEmail("Missing File");
attempts = 0;
this.compoundTrigger.setOverride(null);
}
else {
attempts = 0;
this.compoundTrigger.setOverride(null);
logger.info("Found load file");
}
return result;
}
public void setOverrideTrigger() {
this.compoundTrigger.setOverride(this.override);
}
public CompoundTrigger getCompoundTrigger() {
return compoundTrigger;
}
}
问题是当应用程序在 15 分钟后重新尝试 SFTP 时,服务器会话已超时。因此,JSch 报告:
2017-01-27 18:30:01.447 INFO 14248 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Caught an exception, leaving main loop due to End of IO Stream Read
2017-01-27 18:30:01.448 INFO 14248 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Disconnecting from my.sftp.com port 22
因此,在下一个 cron 时间(即第二天)之前,不会再收到任何消息,并且应用程序不会尝试 SFTP 文件。等待 15 分钟后如何重新尝试 SFTP 文件?
我尝试在 DefaultSftpSessionFactory
上调用 setServerAliveInterval
,但没有成功。注意:在检索文件或尝试 5 次后,我想停止与 SFTP 服务器的通信,直到下一个 cron 运行.
更新 我终于能够执行测试了。根据要求添加调试日志。注意:调试日志在 prop 文件中设置如下:
logging.level.org.springframework: DEBUG
logging.level.com.jcraft.jsch: DEBUG
这是日志:
2017-02-15 18:15:56.206 DEBUG 26748 --- [task-scheduler-9] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:16:56.211 DEBUG 26748 --- [task-scheduler-9] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:17:56.213 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:18:56.214 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:19:56.215 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:20:56.215 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:21:56.217 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:22:56.218 DEBUG 26748 --- [task-scheduler-8] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:23:56.219 DEBUG 26748 --- [task-scheduler-3] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:24:56.221 DEBUG 26748 --- [task-scheduler-2] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:25:56.222 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:26:56.223 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:27:56.224 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:28:56.225 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:29:56.226 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:30:03.884 INFO 26748 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Caught an exception, leaving main
loop due to End of IO Stream Read
2017-02-15 18:30:03.884 INFO 26748 --- [Connect thread my.sftp.com session] com.jcraft.jsch : Disconnecting from my.sftp.com port 22
2017-02-15 18:30:56.227 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:31:56.228 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:32:56.228 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2017-02-15 18:33:56.230 DEBUG 26748 --- [task-scheduler-6] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
即使我设置了调试级别,JSCH 也不会记录调试语句。从日志来看,SFTP 连接似乎已终止。注意:由于文件系统同步的另一个轮询器,task-scheduler-6 线程每分钟轮询一次。
缓存应该检测到会话不再有效并分发一个新会话。我需要查看 DEBUG 日志(针对 spring 和 jsch)以了解发生了什么。
但是,如果你每天只运行一次,真的没有必要缓存会话,直接使用会话工厂,你会在每次轮询时得到一个新会话。