Spring 使用分区的批处理多线程
Spring batch multithreading using partitioning
我的问题陈述是 - 我必须将多个文件传递给 spring 批处理 reader 并且 reader 在 parellel.if 中运行 我们使用 grid-size = 100 那么就会有 100 个线程,这是不合逻辑的。
解决这个问题的方法是什么,即用有限的线程数处理许多文件。
@Bean
public Step orderStep1() throws IOException {
return stepBuilderFactory.get("orderStep1")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.gridSize(100)
.taskExecutor(taskExecutor())
.build();
}
Task executor will be
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
return taskExecutor;
}
partitoner will be
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionData = new HashMap<String, ExecutionContext>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("file", fileList.get(i)); //passing filelist
executionContext.putString("name", "Thread" + i);
partitionData.put("partition: " + i, executionContext);
}
return partitionData;
}
并使用 stepExecutionContext
动态传递文件
if we use grid-size = 100 then there will be 100 threads which is not logical
网格大小和线程池大小是两个不同的东西。您可以处理 100 个分区,但只有 10 个工作线程可用。
您遇到的问题是您使用的 SimpleAsyncTaskExecutor
不会重复使用线程(请参阅其 Javadoc)。因此,对于每个分区,将创建一个新线程,您最终会看到 100 个分区有 100 个线程。
what is the way to solve this issue i.e. process many files with limited number of threads.
考虑使用 ThreadPoolTaskExecutor 以限制工作线程的数量。
我的问题陈述是 - 我必须将多个文件传递给 spring 批处理 reader 并且 reader 在 parellel.if 中运行 我们使用 grid-size = 100 那么就会有 100 个线程,这是不合逻辑的。 解决这个问题的方法是什么,即用有限的线程数处理许多文件。
@Bean
public Step orderStep1() throws IOException {
return stepBuilderFactory.get("orderStep1")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.gridSize(100)
.taskExecutor(taskExecutor())
.build();
}
Task executor will be
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
return taskExecutor;
}
partitoner will be
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionData = new HashMap<String, ExecutionContext>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("file", fileList.get(i)); //passing filelist
executionContext.putString("name", "Thread" + i);
partitionData.put("partition: " + i, executionContext);
}
return partitionData;
}
并使用 stepExecutionContext
动态传递文件if we use grid-size = 100 then there will be 100 threads which is not logical
网格大小和线程池大小是两个不同的东西。您可以处理 100 个分区,但只有 10 个工作线程可用。
您遇到的问题是您使用的 SimpleAsyncTaskExecutor
不会重复使用线程(请参阅其 Javadoc)。因此,对于每个分区,将创建一个新线程,您最终会看到 100 个分区有 100 个线程。
what is the way to solve this issue i.e. process many files with limited number of threads.
考虑使用 ThreadPoolTaskExecutor 以限制工作线程的数量。