在 Spring 批处理中实现 TaskExecutor 以进行并行处理

Implementation of TaskExecutor in Spring Batch for parallel processing

考虑一个 Step bean:

@Bean
  public Step stepForChunkProcessing() {
    return stepBuilderFactory
        .get("stepForChunkProcessing")
        .<Entity1, Entity2>chunk(1000)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .taskExecutor(taskExecutor())
        .throttleLimit(10)
        .build();
  }
//@formatter:on

  @Bean
  public TaskExecutor taskExecutor(){
      return new SimpleAsyncTaskExecutor("MyApplication");
  }

要求: 在 Reader 中,它从文件中读取(实体 1 的)记录。 在 Processor 中,它处理,在 Writer 中,它写入数据库。

在任务执行器之前, 只创建了一个线程并且 它会在 Reader 和处理器中循环 1000 次,如上面的块设置中所定义的那样。 然后它会移动到 writer 并写入所有 1000 条记录。 同样,它将从记录号 1001 开始,然后在 Reader 和处理器中处理另外 1000 条记录。 这是同步执行。

TaskExecutor和throttle limit为10后,创建了10个相互独立的线程。 他们将如何维护文件中已被其他线程处理的记录数? 还要考虑如果我在 reader 的 Read 方法中提供同步关键字,那么不同的线程为什么会检查文件中已处理的记录?

这在多线程环境中是不可能的,如参考文档的 Multi-threaded 部分所述:

 Many participants in a Step (such as readers and writers) are stateful.
 If the state is not segregated by thread, then those components are not
 usable in a multi-threaded Step

这就是为什么文档提到要关闭 AbstractItemCountingItemStreamItemReader#setSaveState 的 javadoc 上的状态管理,这里是摘录:

Always set it to false if the reader is being used in a concurrent environment.