Spring 批处理集成 Java DSL 和 RunIdIncrementer 不递增
Spring Batch integration Java DSL and RunIdIncrementer not incrementing
我有一个 spring boot/integration/batch,它将 运行 和轮询 SFTP 上的文件。
我希望最终能够使用相同的参数(基本上是相同的文件)使用 RunIdIncrementer
在作业的配置中定义。
不幸的是 run.id=1 没有递增,我得到了 JobInstanceAlreadyCompleteException
作业配置
@Autowired
private JobBuilderFactory jobBuilders;
@Bean
public Job importOffersJob() {
Job job = jobBuilders.get("importOffersJob")
.start(importOffersStep)
.listener(traceJobExecutionListener())
.incrementer(new RunIdIncrementer())
.build();
return job;
}
积分
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml.mini$")
.deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory(intCfg.getSftpRemoteDirectory())
.localDirectory(new File(intCfg.getSftpLocalDirectory())
),
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw())
.handle(logger())
.get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {
private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);
@Autowired
@Qualifier("importOffersJob")
private Job job;
@Override
public JobLaunchRequest transform(Message<File> source) {
log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
.getAbsolutePath());
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
//jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());
return new JobLaunchRequest(job, jobParams);
}
}
如果我取消注释 jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
它正在工作,但我认为关键是能够重用我的作业配置中定义的增量器,不是吗?
(当我 运行 将同一批次作为简单 spring 启动而不集成时,增量器正在工作)
非常感谢
RunIdIncrementer.getNext()
:
public JobParameters getNext(JobParameters parameters) {
JobParameters params = (parameters == null) ? new JobParameters() : parameters;
long id = params.getLong(key, 0L) + 1;
return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}
您每次都在创建一个新的作业参数,因此它总是 return 1 因为 run.id
不存在。
如果您将 jobParametersBuilder
移动到变换器中的一个字段,与 job
一起,它将起作用,但仅适用于您的应用程序的实例化。下次您启动您的应用程序时,它将再次从 1 开始。
为了在重启后继续存在,您需要在某处保存 run.id
值,或者您需要从存储库中获取最后的作业参数。
我有一个 spring boot/integration/batch,它将 运行 和轮询 SFTP 上的文件。
我希望最终能够使用相同的参数(基本上是相同的文件)使用 RunIdIncrementer
在作业的配置中定义。
不幸的是 run.id=1 没有递增,我得到了 JobInstanceAlreadyCompleteException
作业配置
@Autowired
private JobBuilderFactory jobBuilders;
@Bean
public Job importOffersJob() {
Job job = jobBuilders.get("importOffersJob")
.start(importOffersStep)
.listener(traceJobExecutionListener())
.incrementer(new RunIdIncrementer())
.build();
return job;
}
积分
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\.xml.mini$")
.deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory(intCfg.getSftpRemoteDirectory())
.localDirectory(new File(intCfg.getSftpLocalDirectory())
),
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw())
.handle(logger())
.get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {
private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);
@Autowired
@Qualifier("importOffersJob")
private Job job;
@Override
public JobLaunchRequest transform(Message<File> source) {
log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
.getAbsolutePath());
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
//jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());
return new JobLaunchRequest(job, jobParams);
}
}
如果我取消注释 jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
它正在工作,但我认为关键是能够重用我的作业配置中定义的增量器,不是吗?
(当我 运行 将同一批次作为简单 spring 启动而不集成时,增量器正在工作)
非常感谢
RunIdIncrementer.getNext()
:
public JobParameters getNext(JobParameters parameters) {
JobParameters params = (parameters == null) ? new JobParameters() : parameters;
long id = params.getLong(key, 0L) + 1;
return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}
您每次都在创建一个新的作业参数,因此它总是 return 1 因为 run.id
不存在。
如果您将 jobParametersBuilder
移动到变换器中的一个字段,与 job
一起,它将起作用,但仅适用于您的应用程序的实例化。下次您启动您的应用程序时,它将再次从 1 开始。
为了在重启后继续存在,您需要在某处保存 run.id
值,或者您需要从存储库中获取最后的作业参数。