在 Spring 批处理中实现 TaskExecutor 以进行并行处理
Implementation of TaskExecutor in Spring Batch for parallel processing
考虑一个 Step bean:
@Bean
public Step stepForChunkProcessing() {
return stepBuilderFactory
.get("stepForChunkProcessing")
.<Entity1, Entity2>chunk(1000)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
}
//@formatter:on
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("MyApplication");
}
要求:
在 Reader 中,它从文件中读取(实体 1 的)记录。
在 Processor 中,它处理,在 Writer 中,它写入数据库。
在任务执行器之前,
只创建了一个线程并且
它会在 Reader 和处理器中循环 1000 次,如上面的块设置中所定义的那样。
然后它会移动到 writer 并写入所有 1000 条记录。
同样,它将从记录号 1001 开始,然后在 Reader 和处理器中处理另外 1000 条记录。
这是同步执行。
TaskExecutor和throttle limit为10后,创建了10个相互独立的线程。
他们将如何维护文件中已被其他线程处理的记录数?
还要考虑如果我在 reader 的 Read 方法中提供同步关键字,那么不同的线程为什么会检查文件中已处理的记录?
这在多线程环境中是不可能的,如参考文档的 Multi-threaded 部分所述:
Many participants in a Step (such as readers and writers) are stateful.
If the state is not segregated by thread, then those components are not
usable in a multi-threaded Step
这就是为什么文档提到要关闭 AbstractItemCountingItemStreamItemReader#setSaveState 的 javadoc 上的状态管理,这里是摘录:
Always set it to false if the reader is being used in a concurrent environment.
考虑一个 Step bean:
@Bean
public Step stepForChunkProcessing() {
return stepBuilderFactory
.get("stepForChunkProcessing")
.<Entity1, Entity2>chunk(1000)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
}
//@formatter:on
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("MyApplication");
}
要求: 在 Reader 中,它从文件中读取(实体 1 的)记录。 在 Processor 中,它处理,在 Writer 中,它写入数据库。
在任务执行器之前, 只创建了一个线程并且 它会在 Reader 和处理器中循环 1000 次,如上面的块设置中所定义的那样。 然后它会移动到 writer 并写入所有 1000 条记录。 同样,它将从记录号 1001 开始,然后在 Reader 和处理器中处理另外 1000 条记录。 这是同步执行。
TaskExecutor和throttle limit为10后,创建了10个相互独立的线程。 他们将如何维护文件中已被其他线程处理的记录数? 还要考虑如果我在 reader 的 Read 方法中提供同步关键字,那么不同的线程为什么会检查文件中已处理的记录?
这在多线程环境中是不可能的,如参考文档的 Multi-threaded 部分所述:
Many participants in a Step (such as readers and writers) are stateful.
If the state is not segregated by thread, then those components are not
usable in a multi-threaded Step
这就是为什么文档提到要关闭 AbstractItemCountingItemStreamItemReader#setSaveState 的 javadoc 上的状态管理,这里是摘录:
Always set it to false if the reader is being used in a concurrent environment.