Spring 批处理:java 配置文件中的并行步骤执行
Spring Batch : parallel steps execution in java configuration file
我正在尝试在 java 配置文件中制作一个关于并行步骤执行的示例应用程序,但我很困惑有多少文件(作业存储库、作业启动器和执行等)正在配置和初始化,如果那么配置如何呢?
我只需要一个示例应用程序来阐明作业中并行执行步骤的基础知识。
这是在不同数据集上执行的基本并行步骤,基本上您必须提供一个分区程序,它将为每个步骤创建单独的上下文,并且您可以根据上下文处理其数据集。
<batch:job id="myJob" job-repository="jobRepository">
<batch:step id="master">
<batch:partition step="step1" partitioner="stepPartitioner ">
<batch:handler grid-size="4" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter"
commit-interval="10"/>
</batch:tasklet>
</batch:step>
public class stepPartitioner implements Partitioner {
@Autowired
DaoInterface daoInterface;
@Override
public Map<String, ExecutionContext> partition(int i) {
Map<String, ExecutionContext> result = new HashMap<>();
List<String> keys= daoInterface.getUniqueKeyForStep();
for(String key: keys){
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("key", key);
result.put(key,executionContext);
}
return result;
}
}
这是一个通过 java 配置使用拆分的示例。在此示例中,流程 1 和流程 2 将并行执行:
@Configuration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet() {
return new CountingTasklet();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(stepBuilderFactory.get("step1")
.tasklet(tasklet()).build())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(stepBuilderFactory.get("step2")
.tasklet(tasklet()).build())
.next(stepBuilderFactory.get("step3")
.tasklet(tasklet()).build())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(flow1())
.split(new SimpleAsyncTaskExecutor()).add(flow2())
.end()
.build();
}
public static class CountingTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(String.format("%s has been executed on thread %s", chunkContext.getStepContext().getStepName(), Thread.currentThread().getName()));
return RepeatStatus.FINISHED;
}
}
}
假设您有步骤 A、B1、B2、B3、C。您想要并行 运行 B1、B2 和 B3。您首先需要为它们创建子流程,然后使用 SimpleAsyncTaskExecutor():
添加到一个流程
@Bean
public Job job()
{
final Flow flowB1 = new FlowBuilder<Flow>("subflowb1").from(stepb1()).end();
final Flow flowB2 = new FlowBuilder<Flow>("subflowb2").from(stepb2()).end();
final Flow flowB3 = new FlowBuilder<Flow>("subflowb3").from(stepb3()).end();
final Flow splitFlow = new FlowBuilder<Flow>("splitFlow")
.start(flowB1)
.split(new SimpleAsyncTaskExecutor())
.add(flowB2, flowB3).build();
return jobBuilderFactory
.flow(stepA())
.next(splitFlow)
.next(stepC())
.end()
.build();
}
我正在尝试在 java 配置文件中制作一个关于并行步骤执行的示例应用程序,但我很困惑有多少文件(作业存储库、作业启动器和执行等)正在配置和初始化,如果那么配置如何呢? 我只需要一个示例应用程序来阐明作业中并行执行步骤的基础知识。
这是在不同数据集上执行的基本并行步骤,基本上您必须提供一个分区程序,它将为每个步骤创建单独的上下文,并且您可以根据上下文处理其数据集。
<batch:job id="myJob" job-repository="jobRepository">
<batch:step id="master">
<batch:partition step="step1" partitioner="stepPartitioner ">
<batch:handler grid-size="4" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter"
commit-interval="10"/>
</batch:tasklet>
</batch:step>
public class stepPartitioner implements Partitioner {
@Autowired
DaoInterface daoInterface;
@Override
public Map<String, ExecutionContext> partition(int i) {
Map<String, ExecutionContext> result = new HashMap<>();
List<String> keys= daoInterface.getUniqueKeyForStep();
for(String key: keys){
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("key", key);
result.put(key,executionContext);
}
return result;
}
}
这是一个通过 java 配置使用拆分的示例。在此示例中,流程 1 和流程 2 将并行执行:
@Configuration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet() {
return new CountingTasklet();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(stepBuilderFactory.get("step1")
.tasklet(tasklet()).build())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(stepBuilderFactory.get("step2")
.tasklet(tasklet()).build())
.next(stepBuilderFactory.get("step3")
.tasklet(tasklet()).build())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(flow1())
.split(new SimpleAsyncTaskExecutor()).add(flow2())
.end()
.build();
}
public static class CountingTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(String.format("%s has been executed on thread %s", chunkContext.getStepContext().getStepName(), Thread.currentThread().getName()));
return RepeatStatus.FINISHED;
}
}
}
假设您有步骤 A、B1、B2、B3、C。您想要并行 运行 B1、B2 和 B3。您首先需要为它们创建子流程,然后使用 SimpleAsyncTaskExecutor():
添加到一个流程@Bean
public Job job()
{
final Flow flowB1 = new FlowBuilder<Flow>("subflowb1").from(stepb1()).end();
final Flow flowB2 = new FlowBuilder<Flow>("subflowb2").from(stepb2()).end();
final Flow flowB3 = new FlowBuilder<Flow>("subflowb3").from(stepb3()).end();
final Flow splitFlow = new FlowBuilder<Flow>("splitFlow")
.start(flowB1)
.split(new SimpleAsyncTaskExecutor())
.add(flowB2, flowB3).build();
return jobBuilderFactory
.flow(stepA())
.next(splitFlow)
.next(stepC())
.end()
.build();
}