Spring IntegrationFlow CompositeFileListFilter 不工作

Spring IntegrationFlow CompositeFileListFilter Not Working

我有两个过滤器 regexFilter 和 lastModified。

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .regexFilter(config.getRegexFilter())
            .filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {

    })))

通过谷歌搜索,我了解到我必须使用 CompositeFileListFilter 作为正则表达式,因此将我的代码更改为

.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))

它已编译,但在 运行 时间抛出错误并且频道弯曲并且同样的错误发生在

.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.

public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
        CompositeFileListFilter filters = new CompositeFileListFilter();
            filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
        return filters;
    }

我只想根据文件名进行过滤。同一个远程文件夹有 2 个流,并且都使用相同的 cron 进行轮询,但应该选择它们的相关文件。

编辑 添加最后一个 LastModifiedLsEntryFileListFilter。它工作正常,但根据要求添加。

public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {

private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;

private volatile long age = DEFAULT_AGE;

private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();


public long getAge() {
    return this.age;
}

public void setAge(long age) {
    setAge(age, TimeUnit.SECONDS);
}

public void setAge(long age, TimeUnit unit) {
    this.age = unit.toSeconds(age);
}

@Override
public List<LsEntry> filterFiles(LsEntry[] files) {

    List<LsEntry> list = new ArrayList<LsEntry>();

    long now = System.currentTimeMillis() / 1000;

    for (LsEntry file : files) {

        if (file.getAttrs()
                .isDir()) {
            continue;
        }
        String fileName = file.getFilename();
        Long currentSize = file.getAttrs().getSize();
        Long oldSize = sizeMap.get(fileName);

        if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
            // putting size in map, will verify in next iteration of scheduler
            sizeMap.put(fileName, currentSize);
            log.info("[{}] old size [{}]  increased to [{}]...", file.getFilename(), oldSize, currentSize);
            continue;
        }

        int lastModifiedTime = file.getAttrs()
            .getMTime();

        if (lastModifiedTime + this.age <= now ) {
            list.add(file);
            sizeMap.remove(fileName);
        } else {
            log.info("File [{}] is still being uploaded...", file.getFilename());
        }
    }
    return list;
}

}

PS :当我测试正则表达式过滤器时,为了简单起见,我删除了 LastModifiedLsEntryFileListFilter。所以我的最终流程是

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
            //.filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory()),
            e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                try {

                    this.destroy(String.valueOf(config.getId()));


    configurationService.removeConfigurationChannelById(config.getId());

//                // logging here
                } catch (Exception ex1) {
            }
            }))).publishSubscribeChannel(s -> s
            .subscribe(f -> {

                f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

            })
            .subscribe(f -> {
                if (doArchive) {
                    f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                } else {
                    f.handle(m -> {
                    });
                }

            })
            .subscribe(f -> f
            .handle(m -> {

                // I am handling exception here
            })
            ))
            .get();

这里有例外

2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel 

'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1

编辑 将正则表达式传递给 LastModifiedLsEntryFileListFilter 并处理后对我有用。当我在 CompositeFileListFilter 中使用任何其他 RegexFilter 时,它会出错。

.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))

请显示您的最终流程。我没有看到您在 CompositeFileListFilter 中使用 LastModifiedLsEntryFileListFilter...您绝对不能同时使用 regexFilter()filter() - 最后一个获胜。为避免混淆,我们建议使用 filter() 并使用 CompositeFileListFilterChainFileListFilter.

组合所有这些

另外请问你说的错误是什么。