Spring 批处理作业只应在 spring 集成文件轮询器轮询文件后执行一次
Spring batch job should be executed only once after the spring integration file poller polls the files
我正在尝试从一个或多个文件可能来自的系统文件夹中轮询文件,对于这些文件,我必须只触发一次批处理作业,而不是触发次数等于文件夹中的文件数。在我的情况下,我的批处理一次处理多个文件,我只希望轮询器将信号发送到批处理一次以开始其工作。
已尝试 poller.maxMessagesPerPoll(1) 等,但有些不同。我面临的问题是批处理作业被触发等于轮询器在轮询文件夹中获取的文件数。我只想执行一次批处理
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(fileMessageBatchJob);
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
System.out.println("&&&&&&&&&&&&&&&&&&Inside Integration Flow!!!!");
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\apps_data\recv")),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
.filter(onlyT4F2())
.handle(fileMessageToJobRequest)
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").get();
}
@Bean
public GenericSelector<File> onlyT4F2() {
System.out.println("@@@@@@@Inside GenericSelector of XXX");
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().contains("XXX");
}
};
}
当前行为 - 当轮询器在给定位置检测到 file/files 时,配置的批处理作业会触发多次。如果文件为 4,则批处理作业触发 4 次。
预期行为 - 文件轮询后,批处理作业应该只对任意数量的文件执行一次。由于批处理作业一次处理多个文件,因此不需要多次执行。
如果您需要我这边的任何其他信息,请告诉我。请优先提供帮助
您可以在只有 returns 个文件的入站通道适配器上使用自定义 FileListFilter
。
.filter(myFilterThatOnlyReturnsOneFile)
编辑
public class OnlyOneFileListFilter implements FileListFilter<File> {
@Override
public List<File> filterFiles(File[] files) {
return Collections.singletonList(files[0]);
}
}
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\apps_data\recv"))
.filter(new OnlyOneFileListFilter()),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
...
@Gary Russell - 问题已解决,仅如下使用 GenericSelector。谢谢你的帮助。在第一个 运行 上触发批处理作业后,它会处理所有现有文件并将其移动到其他文件夹,因此我添加了 file.exists() 并且它按照我的期望运行良好。但我观察到 1 小时后或有时即使在提供预期文件后轮询也没有发生,需要你的 help/suggestion 相同。
@Bean
public GenericSelector<File> triggerJobOnlyOnce() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
if(source.getName().contains("XXX") && source.exists())
return true;
return flag;
}
};
}
我正在尝试从一个或多个文件可能来自的系统文件夹中轮询文件,对于这些文件,我必须只触发一次批处理作业,而不是触发次数等于文件夹中的文件数。在我的情况下,我的批处理一次处理多个文件,我只希望轮询器将信号发送到批处理一次以开始其工作。
已尝试 poller.maxMessagesPerPoll(1) 等,但有些不同。我面临的问题是批处理作业被触发等于轮询器在轮询文件夹中获取的文件数。我只想执行一次批处理
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(fileMessageBatchJob);
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
System.out.println("&&&&&&&&&&&&&&&&&&Inside Integration Flow!!!!");
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\apps_data\recv")),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
.filter(onlyT4F2())
.handle(fileMessageToJobRequest)
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").get();
}
@Bean
public GenericSelector<File> onlyT4F2() {
System.out.println("@@@@@@@Inside GenericSelector of XXX");
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().contains("XXX");
}
};
}
当前行为 - 当轮询器在给定位置检测到 file/files 时,配置的批处理作业会触发多次。如果文件为 4,则批处理作业触发 4 次。
预期行为 - 文件轮询后,批处理作业应该只对任意数量的文件执行一次。由于批处理作业一次处理多个文件,因此不需要多次执行。
如果您需要我这边的任何其他信息,请告诉我。请优先提供帮助
您可以在只有 returns 个文件的入站通道适配器上使用自定义 FileListFilter
。
.filter(myFilterThatOnlyReturnsOneFile)
编辑
public class OnlyOneFileListFilter implements FileListFilter<File> {
@Override
public List<File> filterFiles(File[] files) {
return Collections.singletonList(files[0]);
}
}
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\apps_data\recv"))
.filter(new OnlyOneFileListFilter()),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
...
@Gary Russell - 问题已解决,仅如下使用 GenericSelector。谢谢你的帮助。在第一个 运行 上触发批处理作业后,它会处理所有现有文件并将其移动到其他文件夹,因此我添加了 file.exists() 并且它按照我的期望运行良好。但我观察到 1 小时后或有时即使在提供预期文件后轮询也没有发生,需要你的 help/suggestion 相同。
@Bean
public GenericSelector<File> triggerJobOnlyOnce() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
if(source.getName().contains("XXX") && source.exists())
return true;
return flag;
}
};
}