检索一次与过滤器匹配的所有文件
Retrieve all files that match a filter once
我正在尝试使用我的过滤器从我的流式入站 ftp apdater 获取文件计数,所以在我处理完所有文件后,我想启动一个远程 shell,或者是否有还有其他方法可以知道适配器已完成发送消息吗?
我已经尝试使用 CompositeFileListFilter 覆盖 public List filterFiles(F[] files) 方法,但它从未被调用。
目前我使用的是固定文件数,但它应该是动态的。
我在 CompositeFileListFilter 上覆盖了这个方法
@Override
public List<F> filterFiles(F[] files) {
log.info("received {} files", files.length);
return super.filterFiles(files);
}
我有以下集成流程,使用原子计数器直到 3,它应该是 3。:
AtomicInteger messageCounter = new AtomicInteger(0);
return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
.remoteDirectory("/inputFolder")
.filter(new CompositeFileListFilterWithCount<>() {{
addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
}})
, pollerConfiguration)
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
.handle(message -> {
int numericValue = messageCounter.incrementAndGet();
log.info("numeric value: {}", numericValue);
if (numericValue == 3) {
messageCounter.set(0);
log.info("launch remote shell here now"));
}
}, e -> e.advice(after()))
.get();
如果我不使用计数器,我会为每个文件进行远程 shell 调用,我只需要调用一次,只有在流程完成时,它才会根据 cronjob 进行调度, 所以我想最后只调用一次。
我用1秒的固定延迟来测试,但它一天只会运行三次,我必须在每个时钟获取所有时间。
这是我用于测试的轮询配置:
sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))
更新
我尝试了 Artem 的建议,但我有一个奇怪的行为,我试图在一次投票中获取某个 ftp 文件夹中的所有文件,所以阅读文档:
if the max-messages-per-poll is set to 1 (the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.
For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll. If that is the case, set max-messages-per-poll to -1. Then, on each poll, the adapter tries to generate as many messages as it possibly can...
所以我将 max-message-per-poll 设置为 -1 这样每次投票都会给我每个文件。
我向 only 添加了一个过滤器以获取 .xml 文件并防止重复,一个 acceptOnceFilter,但是 ftp 流适配器给我无限次相同的文件没有意义,我为此测试使用了 10 秒的 FixedDelay。
2019-07-23 10:32:04.308 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process2 file sample1.xml
2019-07-23 10:32:04.312 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.315 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.326 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.330 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.333 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
2019-07-23 10:32:04.337 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.341 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample1.xml
2019-07-23 10:32:04.345 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.347 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.353 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.357 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.358 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
...............................
return IntegrationFlows
.from(Ftp.inboundStreamingAdapter(testFlowTemplate())
.remoteDirectory("/inputTestFlow")
.filter(new CompositeFileListFilter<>() {{
addFilter(new AcceptOnceFileListFilter<>());
addFilter(new FtpSimplePatternFileListFilter("*.xml"));
}})
, sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> {
execution.setStartDate(new Date());
return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
})
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
.remoteDirectory("/output/")
, e -> e.advice(testFlowAfter())
)
.get();
更新 2
我实现了创建此自定义过滤器所需的功能:
.filter(new FileListFilter<>() {
private final Set<String> seenSet = new HashSet<>();
private Date lastExecution;
@Override
public List<FTPFile> filterFiles(FTPFile[] files) {
return Arrays.stream(files).filter(ftpFile -> {
if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
this.seenSet.clear();
}
lastExecution = new Date();
if (ftpFile.getName().endsWith(".xml")) {
return this.seenSet.add(ftpFile.getRawListing());
}
return false;
}).collect(Collectors.toList());
}
})
但我使用了手工制作的 10 秒间隔,这对我的需要来说没问题,还有其他聪明的方法可以根据触发器使这段代码更好吗?
我认为 cron 触发器在这里不是一个正确的解决方案,因为您确实希望对所有获取的文件使用一个进程。
我认为您在 filterFiles()
中的逻辑是错误的。您真的很想为要处理的文件数设置一个计数器,但不是原始数量:
@Override
public List<F> filterFiles(F[] files) {
List<F> filteredFiles = super.filterFiles(files);
log.info("received {} files", filteredFiles.size());
return filteredFiles;
}
在这里您确实可以为 messageCounter
.
设置一个值
更新
过滤器有这个功能:
/**
* Indicates that this filter supports filtering a single file.
* Filters that return true <b>must</b> override {@link #accept(Object)}.
* Default false.
* @return true to allow external calls to {@link #accept(Object)}.
* @since 5.2
* @see #accept(Object)
*/
default boolean supportsSingleFileFiltering() {
return false;
}
我认为当您在 CompositeFileListFilterWithCount
中将其覆盖为明确的 false
时,您应该会很好。否则你确实是对的:默认情况下每个文件只调用一个普通的 accept()
。只因为你所有的FtpSimplePatternFileListFilter
都默认自带true
,而且都是在FtpSimplePatternFileListFilter
水平上对true
的贡献。
尽管如此,所有这些都告诉我们您已经在使用 Spring Integration 5.2 :-)...
更新 2
尝试 ChainFileListFilter
istead。在链的末尾放置一个 AcceptOnceFileListFilter
。虽然使用 FtpPersistentAcceptOnceFileListFilter
可能更好:它考虑了文件的 lastmodified
。还要考虑将 FTPFile
的一些 LastModifiedFileListFilter
变体包含到链中。您的自定义过滤器中有类似的东西,但作为一个单独的过滤器。
不过,我不确定你所说的基于触发器制作它是什么意思。过滤器和触发器之间没有任何关系。当然你可以有一些共同的interval
属性调整成最后修改的过滤值
顺便说一句:您的故事与最初的 at once 要求相去甚远。 Inbound Channel Adapter 实际上是关于 每条消息一个文件 ,所以你绝对不能在一条消息中有一个文件列表,就像 FtpOutboundGateway
及其LS
或 MGET
命令,正如我在下面的评论中提到的那样。
关于“如何实现一条消息中所有文件或所有消息一起?”你可以试试属性“max-messages-per-poll”。这意味着:
"每次轮询生成的最大消息数。默认为
无穷大(由 -1 表示)用于轮询消费者,1 用于轮询入站通道适配器。
我正在尝试使用我的过滤器从我的流式入站 ftp apdater 获取文件计数,所以在我处理完所有文件后,我想启动一个远程 shell,或者是否有还有其他方法可以知道适配器已完成发送消息吗?
我已经尝试使用 CompositeFileListFilter 覆盖 public List filterFiles(F[] files) 方法,但它从未被调用。
目前我使用的是固定文件数,但它应该是动态的。
我在 CompositeFileListFilter 上覆盖了这个方法
@Override
public List<F> filterFiles(F[] files) {
log.info("received {} files", files.length);
return super.filterFiles(files);
}
我有以下集成流程,使用原子计数器直到 3,它应该是 3。:
AtomicInteger messageCounter = new AtomicInteger(0);
return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
.remoteDirectory("/inputFolder")
.filter(new CompositeFileListFilterWithCount<>() {{
addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
}})
, pollerConfiguration)
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
.handle(message -> {
int numericValue = messageCounter.incrementAndGet();
log.info("numeric value: {}", numericValue);
if (numericValue == 3) {
messageCounter.set(0);
log.info("launch remote shell here now"));
}
}, e -> e.advice(after()))
.get();
如果我不使用计数器,我会为每个文件进行远程 shell 调用,我只需要调用一次,只有在流程完成时,它才会根据 cronjob 进行调度, 所以我想最后只调用一次。
我用1秒的固定延迟来测试,但它一天只会运行三次,我必须在每个时钟获取所有时间。
这是我用于测试的轮询配置:
sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))
更新
我尝试了 Artem 的建议,但我有一个奇怪的行为,我试图在一次投票中获取某个 ftp 文件夹中的所有文件,所以阅读文档:
if the max-messages-per-poll is set to 1 (the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.
For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll. If that is the case, set max-messages-per-poll to -1. Then, on each poll, the adapter tries to generate as many messages as it possibly can...
所以我将 max-message-per-poll 设置为 -1 这样每次投票都会给我每个文件。 我向 only 添加了一个过滤器以获取 .xml 文件并防止重复,一个 acceptOnceFilter,但是 ftp 流适配器给我无限次相同的文件没有意义,我为此测试使用了 10 秒的 FixedDelay。
2019-07-23 10:32:04.308 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process2 file sample1.xml
2019-07-23 10:32:04.312 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.313 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.315 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.324 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.326 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.330 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.331 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.333 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
2019-07-23 10:32:04.337 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.338 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.341 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample1.xml
2019-07-23 10:32:04.345 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.346 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.347 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.351 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.353 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] o.s.integration.ftp.session.FtpSession : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : Advice after handle.
2019-07-23 10:32:04.357 INFO 9008 --- [ scheduling-1] i.d.e.v.job.factory.TestFlowFactory : ________________________________
2019-07-23 10:32:04.358 INFO 9008 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : process file sample4.xml
...............................
return IntegrationFlows
.from(Ftp.inboundStreamingAdapter(testFlowTemplate())
.remoteDirectory("/inputTestFlow")
.filter(new CompositeFileListFilter<>() {{
addFilter(new AcceptOnceFileListFilter<>());
addFilter(new FtpSimplePatternFileListFilter("*.xml"));
}})
, sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
.transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
.log(message -> {
execution.setStartDate(new Date());
return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
})
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
.remoteDirectory("/output/")
, e -> e.advice(testFlowAfter())
)
.get();
更新 2
我实现了创建此自定义过滤器所需的功能:
.filter(new FileListFilter<>() {
private final Set<String> seenSet = new HashSet<>();
private Date lastExecution;
@Override
public List<FTPFile> filterFiles(FTPFile[] files) {
return Arrays.stream(files).filter(ftpFile -> {
if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
this.seenSet.clear();
}
lastExecution = new Date();
if (ftpFile.getName().endsWith(".xml")) {
return this.seenSet.add(ftpFile.getRawListing());
}
return false;
}).collect(Collectors.toList());
}
})
但我使用了手工制作的 10 秒间隔,这对我的需要来说没问题,还有其他聪明的方法可以根据触发器使这段代码更好吗?
我认为 cron 触发器在这里不是一个正确的解决方案,因为您确实希望对所有获取的文件使用一个进程。
我认为您在 filterFiles()
中的逻辑是错误的。您真的很想为要处理的文件数设置一个计数器,但不是原始数量:
@Override
public List<F> filterFiles(F[] files) {
List<F> filteredFiles = super.filterFiles(files);
log.info("received {} files", filteredFiles.size());
return filteredFiles;
}
在这里您确实可以为 messageCounter
.
更新
过滤器有这个功能:
/**
* Indicates that this filter supports filtering a single file.
* Filters that return true <b>must</b> override {@link #accept(Object)}.
* Default false.
* @return true to allow external calls to {@link #accept(Object)}.
* @since 5.2
* @see #accept(Object)
*/
default boolean supportsSingleFileFiltering() {
return false;
}
我认为当您在 CompositeFileListFilterWithCount
中将其覆盖为明确的 false
时,您应该会很好。否则你确实是对的:默认情况下每个文件只调用一个普通的 accept()
。只因为你所有的FtpSimplePatternFileListFilter
都默认自带true
,而且都是在FtpSimplePatternFileListFilter
水平上对true
的贡献。
尽管如此,所有这些都告诉我们您已经在使用 Spring Integration 5.2 :-)...
更新 2
尝试 ChainFileListFilter
istead。在链的末尾放置一个 AcceptOnceFileListFilter
。虽然使用 FtpPersistentAcceptOnceFileListFilter
可能更好:它考虑了文件的 lastmodified
。还要考虑将 FTPFile
的一些 LastModifiedFileListFilter
变体包含到链中。您的自定义过滤器中有类似的东西,但作为一个单独的过滤器。
不过,我不确定你所说的基于触发器制作它是什么意思。过滤器和触发器之间没有任何关系。当然你可以有一些共同的interval
属性调整成最后修改的过滤值
顺便说一句:您的故事与最初的 at once 要求相去甚远。 Inbound Channel Adapter 实际上是关于 每条消息一个文件 ,所以你绝对不能在一条消息中有一个文件列表,就像 FtpOutboundGateway
及其LS
或 MGET
命令,正如我在下面的评论中提到的那样。
关于“如何实现一条消息中所有文件或所有消息一起?”你可以试试属性“max-messages-per-poll”。这意味着:
"每次轮询生成的最大消息数。默认为 无穷大(由 -1 表示)用于轮询消费者,1 用于轮询入站通道适配器。