Spring 批处理集成 Java DSL 和 RunIdIncrementer 不递增

Spring Batch integration Java DSL and RunIdIncrementer not incrementing

我有一个 spring boot/integration/batch,它将 运行 和轮询 SFTP 上的文件。

我希望最终能够使用相同的参数(基本上是相同的文件)使用 RunIdIncrementer 在作业的配置中定义。

不幸的是 run.id=1 没有递增,我得到了 JobInstanceAlreadyCompleteException

作业配置

@Autowired
private JobBuilderFactory jobBuilders;

@Bean
public Job importOffersJob() {

    Job job = jobBuilders.get("importOffersJob")
            .start(importOffersStep)
            .listener(traceJobExecutionListener())  
            .incrementer(new RunIdIncrementer())
            .build();

    return job;
}

积分

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\.xml.mini$")
                .deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
                .preserveTimestamp(Boolean.TRUE)
                .autoCreateLocalDirectory(Boolean.TRUE)
                .remoteDirectory(intCfg.getSftpRemoteDirectory())
                .localDirectory(new File(intCfg.getSftpLocalDirectory())
            ), 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw())
            .handle(logger())
            .get();
}

public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {

    private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);

    @Autowired
    @Qualifier("importOffersJob")
    private Job job;

    @Override
    public JobLaunchRequest transform(Message<File> source) {
        log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
                .getAbsolutePath());

        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
        //jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());

        JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());

        return new JobLaunchRequest(job, jobParams);
        }
     }

如果我取消注释 jobParametersBuilder.addString("UUID", UUID.randomUUID().toString()); 它正在工作,但我认为关键是能够重用我的作业配置中定义的增量器,不是吗?

(当我 运行 将同一批次作为简单 spring 启动而不集成时,增量器正在工作)

非常感谢

RunIdIncrementer.getNext():

public JobParameters getNext(JobParameters parameters) {

    JobParameters params = (parameters == null) ? new JobParameters() : parameters;

    long id = params.getLong(key, 0L) + 1;
    return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}

您每次都在创建一个新的作业参数,因此它总是 return 1 因为 run.id 不存在。

如果您将 jobParametersBuilder 移动到变换器中的一个字段,与 job 一起,它将起作用,但仅适用于您的应用程序的实例化。下次您启动您的应用程序时,它将再次从 1 开始。

为了在重启后继续存在,您需要在某处保存 run.id 值,或者您需要从存储库中获取最后的作业参数。