Sftp 入站适配器的 LastModifiedFileListFilter
LastModifiedFileListFilter for Sftp inbound adapter
我正在尝试实现 LastModifiedFileListFilter
,因为看起来 spring-integration-sftp 没有类似的过滤器,但对于 5.3.2 版本,我试图复制 LastModifiedFileListFilter
来自 spring-integration-file 但丢弃回调不起作用。这是我的实现:
@Slf4j
@Data
public class LastModifiedLsEntryFileListFilter implements DiscardAwareFileListFilter<ChannelSftp.LsEntry> {
private static final long ONE_SECOND = 1000;
private static final long DEFAULT_AGE = 30;
private volatile long age = DEFAULT_AGE;
@Nullable
private Consumer<ChannelSftp.LsEntry> discardCallback;
public LastModifiedLsEntryFileListFilter(final long age) {
this.age = age;
}
@Override
public List<ChannelSftp.LsEntry> filterFiles(final ChannelSftp.LsEntry[] files) {
final List<ChannelSftp.LsEntry> list = new ArrayList<>();
final long now = System.currentTimeMillis() / ONE_SECOND;
for (final ChannelSftp.LsEntry file : files) {
if (this.fileIsAged(file, now)) {
log.info("File [{}] is aged...", file.getFilename());
list.add(file);
} else if (this.discardCallback != null) {
log.info("File [{}] is still being uploaded...", file.getFilename());
this.discardCallback.accept(file);
}
}
return list;
}
@Override
public boolean accept(final ChannelSftp.LsEntry file) {
if (this.fileIsAged(file, System.currentTimeMillis() / ONE_SECOND)) {
return true;
}
else if (this.discardCallback != null) {
this.discardCallback.accept(file);
}
return false;
}
private boolean fileIsAged(final ChannelSftp.LsEntry file, final long now) {
return file.getAttrs().getMTime() + this.age <= now;
}
@Override
public void addDiscardCallback(@Nullable final Consumer<ChannelSftp.LsEntry> discardCallbackToSet) {
this.discardCallback = discardCallbackToSet;
}
}
过滤器能够正确识别文件的年龄并丢弃它,但不会重试该文件,我认为这是丢弃回调的一部分。
我想我的问题是如何设置丢弃回调以继续重试丢弃的文件,直到文件 ages
。谢谢
not retried which I believe is part of discard callback.
我想知道是什么让你这样想...
FileReadingMessageSource
及其 WatchService
选项的逻辑如下:
if (filter instanceof DiscardAwareFileListFilter) {
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
}
并不意味着 SFTP 实现是相似的。
无论如何都会重试:在下一次轮询中,将再次检查不接受的文件。
您可能没有显示您使用的其他过滤器,并且您的文件在此 LastModifiedLsEntryFileListFilter
之前被过滤掉了,例如与 SftpPersistentAcceptOnceFileListFilter
。您需要考虑将您的“last-modified”作为链中的第一个。
如果您不打算从外部支持丢弃回调,您可能根本不需要实现 DiscardAwareFileListFilter
。
我正在尝试实现 LastModifiedFileListFilter
,因为看起来 spring-integration-sftp 没有类似的过滤器,但对于 5.3.2 版本,我试图复制 LastModifiedFileListFilter
来自 spring-integration-file 但丢弃回调不起作用。这是我的实现:
@Slf4j
@Data
public class LastModifiedLsEntryFileListFilter implements DiscardAwareFileListFilter<ChannelSftp.LsEntry> {
private static final long ONE_SECOND = 1000;
private static final long DEFAULT_AGE = 30;
private volatile long age = DEFAULT_AGE;
@Nullable
private Consumer<ChannelSftp.LsEntry> discardCallback;
public LastModifiedLsEntryFileListFilter(final long age) {
this.age = age;
}
@Override
public List<ChannelSftp.LsEntry> filterFiles(final ChannelSftp.LsEntry[] files) {
final List<ChannelSftp.LsEntry> list = new ArrayList<>();
final long now = System.currentTimeMillis() / ONE_SECOND;
for (final ChannelSftp.LsEntry file : files) {
if (this.fileIsAged(file, now)) {
log.info("File [{}] is aged...", file.getFilename());
list.add(file);
} else if (this.discardCallback != null) {
log.info("File [{}] is still being uploaded...", file.getFilename());
this.discardCallback.accept(file);
}
}
return list;
}
@Override
public boolean accept(final ChannelSftp.LsEntry file) {
if (this.fileIsAged(file, System.currentTimeMillis() / ONE_SECOND)) {
return true;
}
else if (this.discardCallback != null) {
this.discardCallback.accept(file);
}
return false;
}
private boolean fileIsAged(final ChannelSftp.LsEntry file, final long now) {
return file.getAttrs().getMTime() + this.age <= now;
}
@Override
public void addDiscardCallback(@Nullable final Consumer<ChannelSftp.LsEntry> discardCallbackToSet) {
this.discardCallback = discardCallbackToSet;
}
}
过滤器能够正确识别文件的年龄并丢弃它,但不会重试该文件,我认为这是丢弃回调的一部分。
我想我的问题是如何设置丢弃回调以继续重试丢弃的文件,直到文件 ages
。谢谢
not retried which I believe is part of discard callback.
我想知道是什么让你这样想...
FileReadingMessageSource
及其 WatchService
选项的逻辑如下:
if (filter instanceof DiscardAwareFileListFilter) {
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
}
并不意味着 SFTP 实现是相似的。
无论如何都会重试:在下一次轮询中,将再次检查不接受的文件。
您可能没有显示您使用的其他过滤器,并且您的文件在此 LastModifiedLsEntryFileListFilter
之前被过滤掉了,例如与 SftpPersistentAcceptOnceFileListFilter
。您需要考虑将您的“last-modified”作为链中的第一个。
如果您不打算从外部支持丢弃回调,您可能根本不需要实现 DiscardAwareFileListFilter
。