Spring IntegrationFlow CompositeFileListFilter 不工作
Spring IntegrationFlow CompositeFileListFilter Not Working
我有两个过滤器 regexFilter 和 lastModified。
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.regexFilter(config.getRegexFilter())
.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory())
, e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
})))
通过谷歌搜索,我了解到我必须使用 CompositeFileListFilter 作为正则表达式,因此将我的代码更改为
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
它已编译,但在 运行 时间抛出错误并且频道弯曲并且同样的错误发生在
.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.
public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
CompositeFileListFilter filters = new CompositeFileListFilter();
filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
return filters;
}
我只想根据文件名进行过滤。同一个远程文件夹有 2 个流,并且都使用相同的 cron 进行轮询,但应该选择它们的相关文件。
编辑
添加最后一个 LastModifiedLsEntryFileListFilter。它工作正常,但根据要求添加。
public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {
private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;
private volatile long age = DEFAULT_AGE;
private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();
public long getAge() {
return this.age;
}
public void setAge(long age) {
setAge(age, TimeUnit.SECONDS);
}
public void setAge(long age, TimeUnit unit) {
this.age = unit.toSeconds(age);
}
@Override
public List<LsEntry> filterFiles(LsEntry[] files) {
List<LsEntry> list = new ArrayList<LsEntry>();
long now = System.currentTimeMillis() / 1000;
for (LsEntry file : files) {
if (file.getAttrs()
.isDir()) {
continue;
}
String fileName = file.getFilename();
Long currentSize = file.getAttrs().getSize();
Long oldSize = sizeMap.get(fileName);
if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
// putting size in map, will verify in next iteration of scheduler
sizeMap.put(fileName, currentSize);
log.info("[{}] old size [{}] increased to [{}]...", file.getFilename(), oldSize, currentSize);
continue;
}
int lastModifiedTime = file.getAttrs()
.getMTime();
if (lastModifiedTime + this.age <= now ) {
list.add(file);
sizeMap.remove(fileName);
} else {
log.info("File [{}] is still being uploaded...", file.getFilename());
}
}
return list;
}
}
PS :当我测试正则表达式过滤器时,为了简单起见,我删除了 LastModifiedLsEntryFileListFilter。所以我的最终流程是
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
//.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory()),
e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
try {
this.destroy(String.valueOf(config.getId()));
configurationService.removeConfigurationChannelById(config.getId());
// // logging here
} catch (Exception ex1) {
}
}))).publishSubscribeChannel(s -> s
.subscribe(f -> {
f.handle(Sftp.outboundAdapter(outboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));
})
.subscribe(f -> {
if (doArchive) {
f.handle(Sftp.outboundAdapter(inboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getInboundArchiveDirectory()));
} else {
f.handle(m -> {
});
}
})
.subscribe(f -> f
.handle(m -> {
// I am handling exception here
})
))
.get();
这里有例外
2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel
'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
编辑
将正则表达式传递给 LastModifiedLsEntryFileListFilter 并处理后对我有用。当我在 CompositeFileListFilter 中使用任何其他 RegexFilter 时,它会出错。
.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
请显示您的最终流程。我没有看到您在 CompositeFileListFilter
中使用 LastModifiedLsEntryFileListFilter
...您绝对不能同时使用 regexFilter()
和 filter()
- 最后一个获胜。为避免混淆,我们建议使用 filter()
并使用 CompositeFileListFilter
或 ChainFileListFilter
.
组合所有这些
另外请问你说的错误是什么。
我有两个过滤器 regexFilter 和 lastModified。
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.regexFilter(config.getRegexFilter())
.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory())
, e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
})))
通过谷歌搜索,我了解到我必须使用 CompositeFileListFilter 作为正则表达式,因此将我的代码更改为
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
它已编译,但在 运行 时间抛出错误并且频道弯曲并且同样的错误发生在
.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.
public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
CompositeFileListFilter filters = new CompositeFileListFilter();
filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
return filters;
}
我只想根据文件名进行过滤。同一个远程文件夹有 2 个流,并且都使用相同的 cron 进行轮询,但应该选择它们的相关文件。
编辑 添加最后一个 LastModifiedLsEntryFileListFilter。它工作正常,但根据要求添加。
public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {
private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;
private volatile long age = DEFAULT_AGE;
private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();
public long getAge() {
return this.age;
}
public void setAge(long age) {
setAge(age, TimeUnit.SECONDS);
}
public void setAge(long age, TimeUnit unit) {
this.age = unit.toSeconds(age);
}
@Override
public List<LsEntry> filterFiles(LsEntry[] files) {
List<LsEntry> list = new ArrayList<LsEntry>();
long now = System.currentTimeMillis() / 1000;
for (LsEntry file : files) {
if (file.getAttrs()
.isDir()) {
continue;
}
String fileName = file.getFilename();
Long currentSize = file.getAttrs().getSize();
Long oldSize = sizeMap.get(fileName);
if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
// putting size in map, will verify in next iteration of scheduler
sizeMap.put(fileName, currentSize);
log.info("[{}] old size [{}] increased to [{}]...", file.getFilename(), oldSize, currentSize);
continue;
}
int lastModifiedTime = file.getAttrs()
.getMTime();
if (lastModifiedTime + this.age <= now ) {
list.add(file);
sizeMap.remove(fileName);
} else {
log.info("File [{}] is still being uploaded...", file.getFilename());
}
}
return list;
}
}
PS :当我测试正则表达式过滤器时,为了简单起见,我删除了 LastModifiedLsEntryFileListFilter。所以我的最终流程是
return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
.localDirectory(this.getlocalDirectory(config.getId()))
.deleteRemoteFiles(true)
.autoCreateLocalDirectory(true)
.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
//.filter(new LastModifiedLsEntryFileListFilter())
.remoteDirectory(config.getInboundDirectory()),
e -> e.poller(Pollers.fixedDelay(60_000)
.errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
try {
this.destroy(String.valueOf(config.getId()));
configurationService.removeConfigurationChannelById(config.getId());
// // logging here
} catch (Exception ex1) {
}
}))).publishSubscribeChannel(s -> s
.subscribe(f -> {
f.handle(Sftp.outboundAdapter(outboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));
})
.subscribe(f -> {
if (doArchive) {
f.handle(Sftp.outboundAdapter(inboundSftp)
.useTemporaryFileName(false)
.autoCreateDirectory(true)
.remoteDirectory(config.getInboundArchiveDirectory()));
} else {
f.handle(m -> {
});
}
})
.subscribe(f -> f
.handle(m -> {
// I am handling exception here
})
))
.get();
这里有例外
2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel
'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
编辑 将正则表达式传递给 LastModifiedLsEntryFileListFilter 并处理后对我有用。当我在 CompositeFileListFilter 中使用任何其他 RegexFilter 时,它会出错。
.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
请显示您的最终流程。我没有看到您在 CompositeFileListFilter
中使用 LastModifiedLsEntryFileListFilter
...您绝对不能同时使用 regexFilter()
和 filter()
- 最后一个获胜。为避免混淆,我们建议使用 filter()
并使用 CompositeFileListFilter
或 ChainFileListFilter
.
另外请问你说的错误是什么。