Spring 批处理:java 配置文件中的并行步骤执行

Spring Batch : parallel steps execution in java configuration file

我正在尝试在 java 配置文件中制作一个关于并行步骤执行的示例应用程序,但我很困惑有多少文件(作业存储库、作业启动器和执行等)正在配置和初始化,如果那么配置如何呢? 我只需要一个示例应用程序来阐明作业中并行执行步骤的基础知识。

这是在不同数据集上执行的基本并行步骤,基本上您必须提供一个分区程序,它将为每个步骤创建单独的上下文,并且您可以根据上下文处理其数据集。

<batch:job id="myJob" job-repository="jobRepository">
                <batch:step id="master">
                    <batch:partition step="step1" partitioner="stepPartitioner ">
                        <batch:handler grid-size="4" task-executor="taskExecutor"/>
                    </batch:partition>
                </batch:step>

            </batch:job>

        <batch:step id="step1">
                <batch:tasklet>
                    <batch:chunk reader="myReader" processor="myProcessor" writer="myWriter"
                                 commit-interval="10"/>
                </batch:tasklet>
            </batch:step>


    public class stepPartitioner implements Partitioner {

        @Autowired
        DaoInterface daoInterface;

        @Override
        public Map<String, ExecutionContext> partition(int i) {
            Map<String, ExecutionContext> result = new HashMap<>();

            List<String> keys= daoInterface.getUniqueKeyForStep();
            for(String key: keys){


                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.putString("key", key);

                    result.put(key,executionContext);


            }

            return result;
        }
    }

这是一个通过 java 配置使用拆分的示例。在此示例中,流程 1 和流程 2 将并行执行:

@Configuration
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Tasklet tasklet() {
        return new CountingTasklet();
    }

    @Bean
    public Flow flow1() {
        return new FlowBuilder<Flow>("flow1")
                .start(stepBuilderFactory.get("step1")
                        .tasklet(tasklet()).build())
                .build();
    }

    @Bean
    public Flow flow2() {
        return new FlowBuilder<Flow>("flow2")
                .start(stepBuilderFactory.get("step2")
                        .tasklet(tasklet()).build())
                .next(stepBuilderFactory.get("step3")
                        .tasklet(tasklet()).build())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(flow1())
                .split(new SimpleAsyncTaskExecutor()).add(flow2())
                .end()
                .build();
    }

    public static class CountingTasklet implements Tasklet {

        @Override
        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
            System.out.println(String.format("%s has been executed on thread %s", chunkContext.getStepContext().getStepName(), Thread.currentThread().getName()));
            return RepeatStatus.FINISHED;
        }
    }
}

假设您有步骤 A、B1、B2、B3、C。您想要并行 运行 B1、B2 和 B3。您首先需要为它们创建子流程,然后使用 SimpleAsyncTaskExecutor():

添加到一个流程
@Bean
public Job job()
{
    final Flow flowB1 = new FlowBuilder<Flow>("subflowb1").from(stepb1()).end();
    final Flow flowB2 = new FlowBuilder<Flow>("subflowb2").from(stepb2()).end();
    final Flow flowB3 = new FlowBuilder<Flow>("subflowb3").from(stepb3()).end();

    final Flow splitFlow = new FlowBuilder<Flow>("splitFlow")
        .start(flowB1)
        .split(new SimpleAsyncTaskExecutor())
        .add(flowB2, flowB3).build();

    return jobBuilderFactory
       .flow(stepA())
       .next(splitFlow)
       .next(stepC())
       .end()
       .build();
}