Partitioned Spring Batch Step 重复相同成功的slave StepExecutions

Partitioned Spring Batch Step repeats the same successful slave StepExecutions

使用 Spring 批处理 3.0.4.RELEASE.

我将作业配置为使用分区步骤。从属步骤使用块大小 1。任务执行器中有六个线程。我 运行 这个测试使用了从六个到数百个不同的网格大小。我的网格大小是我期望的从属 StepExecutions 的数量 == 我的分区程序创建的 ExecutionContexts 的数量。

结果总是这样:

六个线程分别挑出六个不同的步骤执行,成功执行。然后 相同的六步执行 运行 在同一个线程中一次又一次

我注意到 RepeatTemplate.executeInternal(...) 中有一个永不结束的循环。它不断执行相同的 StepExecution 只是增加版本。

这里是 Java 配置代码:

@Bean
@StepScope
public RapRequestItemReader rapReader(
        @Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
        final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
    final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
            .collect(Collectors.toList());
    return new RapRequestItemReader(timeString, farms);
}

@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
    return rapRequest -> {
        return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
    };
}

@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
    return stepBuilderFactory.get(RAP_STEP_NAME)
            .<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
            .reader(rapReader)
            .processor(rapProcessor())
            .writer(updateCoverageWriter)
            .build();
}

private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
    RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
    return partitioner;
}

@Bean
public ThreadPoolTaskExecutor pullExecutor() {
    ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
    pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setAllowCoreThreadTimeOut(true);
    return pullExecutor;
}

@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
        TaskletStep rapStep1) {
    RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
    int gridSize = calculateGridSize(rapParameter);
    return stepBuilderFactory.get("rapPartitionByTimestampStep")
            .partitioner(rapStep1)
            .partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
            .taskExecutor(pullExecutor())
            .gridSize(gridSize)
            .build();
}

@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
    return jobBuilderFactory.get(JOB_NAME)
            .start(rapPartitionByTimestampStep)
            .build();
}

虽然很难从问题中看出这一点,但问题出在reader。 ItemReader 从未 returning null。

在设计中,StepExecution 应该只处理一个项目。但是,在处理该项目后,ItemReader 再次 returning 同一项目而不是 returning null。

我通过在第二次调用 read 时让 ItemReader return 为空来修复它。

更好的设计可能是使用 TaskletStep 而不是 ChunkStep。