Partitioned Spring Batch Step 重复相同成功的slave StepExecutions
Partitioned Spring Batch Step repeats the same successful slave StepExecutions
使用 Spring 批处理 3.0.4.RELEASE.
我将作业配置为使用分区步骤。从属步骤使用块大小 1。任务执行器中有六个线程。我 运行 这个测试使用了从六个到数百个不同的网格大小。我的网格大小是我期望的从属 StepExecutions 的数量 == 我的分区程序创建的 ExecutionContexts 的数量。
结果总是这样:
六个线程分别挑出六个不同的步骤执行,成功执行。然后 相同的六步执行 运行 在同一个线程中一次又一次 !
我注意到 RepeatTemplate.executeInternal(...) 中有一个永不结束的循环。它不断执行相同的 StepExecution 只是增加版本。
这里是 Java 配置代码:
@Bean
@StepScope
public RapRequestItemReader rapReader(
@Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
.collect(Collectors.toList());
return new RapRequestItemReader(timeString, farms);
}
@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
return rapRequest -> {
return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
};
}
@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
return stepBuilderFactory.get(RAP_STEP_NAME)
.<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
.reader(rapReader)
.processor(rapProcessor())
.writer(updateCoverageWriter)
.build();
}
private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
return partitioner;
}
@Bean
public ThreadPoolTaskExecutor pullExecutor() {
ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setAllowCoreThreadTimeOut(true);
return pullExecutor;
}
@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
TaskletStep rapStep1) {
RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
int gridSize = calculateGridSize(rapParameter);
return stepBuilderFactory.get("rapPartitionByTimestampStep")
.partitioner(rapStep1)
.partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
.taskExecutor(pullExecutor())
.gridSize(gridSize)
.build();
}
@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
return jobBuilderFactory.get(JOB_NAME)
.start(rapPartitionByTimestampStep)
.build();
}
虽然很难从问题中看出这一点,但问题出在reader。 ItemReader 从未 returning null。
在设计中,StepExecution 应该只处理一个项目。但是,在处理该项目后,ItemReader 再次 returning 同一项目而不是 returning null。
我通过在第二次调用 read 时让 ItemReader return 为空来修复它。
更好的设计可能是使用 TaskletStep 而不是 ChunkStep。
使用 Spring 批处理 3.0.4.RELEASE.
我将作业配置为使用分区步骤。从属步骤使用块大小 1。任务执行器中有六个线程。我 运行 这个测试使用了从六个到数百个不同的网格大小。我的网格大小是我期望的从属 StepExecutions 的数量 == 我的分区程序创建的 ExecutionContexts 的数量。
结果总是这样:
六个线程分别挑出六个不同的步骤执行,成功执行。然后 相同的六步执行 运行 在同一个线程中一次又一次 !
我注意到 RepeatTemplate.executeInternal(...) 中有一个永不结束的循环。它不断执行相同的 StepExecution 只是增加版本。
这里是 Java 配置代码:
@Bean
@StepScope
public RapRequestItemReader rapReader(
@Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
.collect(Collectors.toList());
return new RapRequestItemReader(timeString, farms);
}
@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
return rapRequest -> {
return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
};
}
@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
return stepBuilderFactory.get(RAP_STEP_NAME)
.<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
.reader(rapReader)
.processor(rapProcessor())
.writer(updateCoverageWriter)
.build();
}
private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
return partitioner;
}
@Bean
public ThreadPoolTaskExecutor pullExecutor() {
ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setAllowCoreThreadTimeOut(true);
return pullExecutor;
}
@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
TaskletStep rapStep1) {
RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
int gridSize = calculateGridSize(rapParameter);
return stepBuilderFactory.get("rapPartitionByTimestampStep")
.partitioner(rapStep1)
.partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
.taskExecutor(pullExecutor())
.gridSize(gridSize)
.build();
}
@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
return jobBuilderFactory.get(JOB_NAME)
.start(rapPartitionByTimestampStep)
.build();
}
虽然很难从问题中看出这一点,但问题出在reader。 ItemReader 从未 returning null。
在设计中,StepExecution 应该只处理一个项目。但是,在处理该项目后,ItemReader 再次 returning 同一项目而不是 returning null。
我通过在第二次调用 read 时让 ItemReader return 为空来修复它。
更好的设计可能是使用 TaskletStep 而不是 ChunkStep。