检索一次与过滤器匹配的所有文件

Retrieve all files that match a filter once

我正在尝试使用我的过滤器从我的流式入站 ftp apdater 获取文件计数,所以在我处理完所有文件后,我想启动一个远程 shell,或者是否有还有其他方法可以知道适配器已完成发送消息吗?

我已经尝试使用 CompositeFileListFilter 覆盖 public List filterFiles(F[] files) 方法,但它从未被调用。

目前我使用的是固定文件数,但它应该是动态的。

我在 CompositeFileListFilter 上覆盖了这个方法

@Override
    public List<F> filterFiles(F[] files) {
        log.info("received {} files", files.length);
        return super.filterFiles(files);
    }

我有以下集成流程,使用原子计数器直到 3,它应该是 3。:

AtomicInteger messageCounter = new AtomicInteger(0);
        return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
                .remoteDirectory("/inputFolder")
                .filter(new CompositeFileListFilterWithCount<>() {{
                    addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
                }})
            , pollerConfiguration)
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
            .handle(message -> {
                int numericValue = messageCounter.incrementAndGet();
                log.info("numeric value: {}", numericValue);
                if (numericValue == 3) {
                    messageCounter.set(0);
                    log.info("launch remote shell here now"));                 
                }
            }, e -> e.advice(after()))
            .get();

如果我不使用计数器,我会为每个文件进行远程 shell 调用,我只需要调用一次,只有在流程完成时,它才会根据 cronjob 进行调度, 所以我想最后只调用一次。

我用1秒的固定延迟来测试,但它一天只会运行三次,我必须在每个时钟获取所有时间。

这是我用于测试的轮询配置:

sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))

更新

我尝试了 Artem 的建议,但我有一个奇怪的行为,我试图在一次投票中获取某个 ftp 文件夹中的所有文件,所以阅读文档:

if the max-messages-per-poll is set to 1 (the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.

For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll. If that is the case, set max-messages-per-poll to -1. Then, on each poll, the adapter tries to generate as many messages as it possibly can...

所以我将 max-message-per-poll 设置为 -1 这样每次投票都会给我每个文件。 我向 only 添加了一个过滤器以获取 .xml 文件并防止重复,一个 acceptOnceFilter,但是 ftp 流适配器给我无限次相同的文件没有意义,我为此测试使用了 10 秒的 FixedDelay。

2019-07-23 10:32:04.308  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process2 file sample1.xml
2019-07-23 10:32:04.312  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.315  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.326  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.330  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.333  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
2019-07-23 10:32:04.337  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.341  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample1.xml
2019-07-23 10:32:04.345  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.347  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.353  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.357  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.358  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
...............................
return IntegrationFlows
            .from(Ftp.inboundStreamingAdapter(testFlowTemplate())
                    .remoteDirectory("/inputTestFlow")
                    .filter(new CompositeFileListFilter<>() {{
                        addFilter(new AcceptOnceFileListFilter<>());
                        addFilter(new FtpSimplePatternFileListFilter("*.xml"));
                    }})
                , sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> {
                execution.setStartDate(new Date());
                return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
            })
            .handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
                    .useTemporaryFileName(false)
                    .fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
                    .remoteDirectory("/output/")
                , e -> e.advice(testFlowAfter())
            )
            .get();

更新 2

我实现了创建此自定义过滤器所需的功能:

.filter(new FileListFilter<>() {
                        private final Set<String> seenSet = new HashSet<>();
                        private Date lastExecution;

                        @Override
                        public List<FTPFile> filterFiles(FTPFile[] files) {
                            return Arrays.stream(files).filter(ftpFile -> {
                                if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
                                    this.seenSet.clear();
                                }
                                lastExecution = new Date();
                                if (ftpFile.getName().endsWith(".xml")) {
                                    return this.seenSet.add(ftpFile.getRawListing());
                                }
                                return false;
                            }).collect(Collectors.toList());
                        }
                    })

但我使用了手工制作的 10 秒间隔,这对我的需要来说没问题,还有其他聪明的方法可以根据触发器使这段代码更好吗?

我认为 cron 触发器在这里不是一个正确的解决方案,因为您确实希望对所有获取的文件使用一个进程。

我认为您在 filterFiles() 中的逻辑是错误的。您真的很想为要处理的文件数设置一个计数器,但不是原始数量:

@Override
public List<F> filterFiles(F[] files) {
    List<F> filteredFiles = super.filterFiles(files);
    log.info("received {} files", filteredFiles.size());
    return filteredFiles;
}

在这里您确实可以为 messageCounter.

设置一个值

更新

过滤器有这个功能:

/**
 * Indicates that this filter supports filtering a single file.
 * Filters that return true <b>must</b> override {@link #accept(Object)}.
 * Default false.
 * @return true to allow external calls to {@link #accept(Object)}.
 * @since 5.2
 * @see #accept(Object)
 */
default boolean supportsSingleFileFiltering() {
    return false;
}

我认为当您在 CompositeFileListFilterWithCount 中将其覆盖为明确的 false 时,您应该会很好。否则你确实是对的:默认情况下每个文件只调用一个普通的 accept() 。只因为你所有的FtpSimplePatternFileListFilter都默认自带true,而且都是在FtpSimplePatternFileListFilter水平上对true的贡献。

尽管如此,所有这些都告诉我们您已经在使用 Spring Integration 5.2 :-)...

更新 2

尝试 ChainFileListFilter istead。在链的末尾放置一个 AcceptOnceFileListFilter。虽然使用 FtpPersistentAcceptOnceFileListFilter 可能更好:它考虑了文件的 lastmodified。还要考虑将 FTPFile 的一些 LastModifiedFileListFilter 变体包含到链中。您的自定义过滤器中有类似的东西,但作为一个单独的过滤器。

不过,我不确定你所说的基于触发器制作它是什么意思。过滤器和触发器之间没有任何关系。当然你可以有一些共同的interval属性调整成最后修改的过滤值

顺便说一句:您的故事与最初的 at once 要求相去甚远。 Inbound Channel Adapter 实际上是关于 每条消息一个文件 ,所以你绝对不能在一条消息中有一个文件列表,就像 FtpOutboundGateway 及其LSMGET 命令,正如我在下面的评论中提到的那样。

关于“如何实现一条消息中所有文件或所有消息一起?”你可以试试属性“max-messages-per-poll”。这意味着:

"每次轮询生成的最大消息数。默认为 无穷大(由 -1 表示)用于轮询消费者,1 用于轮询入站通道适配器。