如何使 spring 批处理步骤与可配置线程数并行执行?

How to make spring batch step execution parallel with configurable thread count?

我有以下 spring-批量申请

SpringBatchApplication.java

package com.spbt.job.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchApplication.class, args);
    }
}

TraverseJob.java

package com.spbt.job.sample;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableBatchProcessing
public class TraverseJob {

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    private String inputFolderPath = "/tmp/inputFolder";

    @Bean("TraverseJob")
    public Job job() {
        return jobBuilderFactory.get("TraverseJob")
                .incrementer(new RunIdIncrementer())
                .start(traverseStep())
                .build();
    }


    @Bean("TraverseStep")
    public Step traverseStep() {
        return stepBuilderFactory.get("TraverseStep")
                .tasklet(traverseJobTasklet(null))
                .build();
    }

    @Bean("TraverseJobTasklet")
    @StepScope
    public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
        TraverseJobTasklet tasklet = new TraverseJobTasklet();

        tasklet.setJobDate(date);
        tasklet.setJobDirPath(inputFolderPath);

        return tasklet;
    }
}

TraverseJobTasklet.java

package com.spbt.job.sample;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;

public class TraverseJobTasklet implements Tasklet {

    private String jobDirPath;
    private String jobDate;

    @Autowired
    private RemoteFilePush remoteFilePush;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        try {
            traverseDir(new File(jobDirPath));
        } catch (Exception ex) {
            throw ex;
        }
        return RepeatStatus.FINISHED;
    }

    private void traverseDir(File filePath) throws Exception {
        try {
            File[] files = filePath.listFiles();
            if (files != null) {
                for (File file : files) {
                    String name = file.getName();
                    if (file.isDirectory()) {
                        if (remoteFilePush.isRemoteDirExist(name)) {
                            continue;
                        } else {
                            remoteFilePush.createRemoteDir(name);
                            traverseDir(file);
                        }
                    } else {
                        remoteFilePush.pushFile(file.getPath());
                    }
                }
            } else {
                throw new Exception("empty/null dir -> " + filePath.getName());
            }
        } catch (Exception ex) {
            throw ex;
        }
    }


    public String getJobDirPath() {
        return jobDirPath;
    }

    public void setJobDirPath(String jobDirPath) {
        this.jobDirPath = jobDirPath;
    }

    public String getJobDate() {
        return jobDate;
    }

    public void setJobDate(String jobDate) {
        this.jobDate = jobDate;
    }
}

RemoteFilePushLogic.java

package com.spbt.job.sample;

import org.springframework.stereotype.Component;

@Component
public class RemoteFilePush {

    public boolean isRemoteDirExist(String name) throws InterruptedException {
        boolean isRemoteDirExist = false;
        // code to check dir on remote server
        return isRemoteDirExist;
    }

    public void createRemoteDir(String name) throws InterruptedException {
        // code to create dir on remote server
    }

    public void pushFile(String path) throws InterruptedException {
        // code to push file on remote server
        System.out.println("Pushed");
    }
}

我想在 TraverseJobTasklet 的 traverseDir 方法中进行并行遍历和执行,通过保持我的 RemoteFilePush 逻辑完整,我的 inputFolderPath 可以包含多个子目录,每个子目录都包含一些文件。

我已尝试按照 link 获取我正在使用的 spring-批处理版本,但它是基于 xml 的,我似乎不知道如何创建多个步骤我有单次 traverseStep 吗?

input a sub-folder string path per worker step is where i am hitting wall with spring code, if you can point me to some ref. it will be helpful, most of the example on net is xml based.

这是一个快速 self-contained 示例 Java 配置:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class PartitionJobSample {

    private final JobBuilderFactory jobs;
    private final StepBuilderFactory steps;

    public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobs = jobs;
        this.steps = steps;
    }

    @Bean
    public Step managerStep() {
        return steps.get("masterStep")
                .partitioner(workerStep().getName(), partitioner(null))
                .step(workerStep())
                .gridSize(4)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
    }

    @Bean
    @StepScope
    public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
        List<String> subFolders = getSubFolders(rootFolder);
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> map = new HashMap<>(gridSize);
                for (String folder : subFolders) {
                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.put("filePath", folder);
                    map.put("partition-for-" + folder, executionContext);
                }
                return map;
            }
        };
    }

    private List<String> getSubFolders(String rootFolder) {
        // TODO implement this
        return Arrays.asList("/data/folder1", "/data/folder2");
    }

    @Bean
    public Step workerStep() {
        return steps.get("workerStep")
                .tasklet(getTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
        return new TraverseJobTasklet(filePath);
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(managerStep())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("rootFolder", "/data")
                .toJobParameters();
        jobLauncher.run(job, jobParameters);
    }

    class TraverseJobTasklet implements Tasklet {

        private String filePath;

        public TraverseJobTasklet(String filePath) {
            this.filePath = filePath;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            // TODO call traversePath for filePath which is a sub-folder here
            System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
            return RepeatStatus.FINISHED;
        }
    }

}

它将根目录作为作业参数传递并执行分区步骤,其中每个工作进程处理一个 sub-folder(调用您的 tasklet)。

如果你 运行 它,你应该看到类似的东西:

SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2

我会让你根据自己的情况进行调整。