如何使 spring 批处理步骤与可配置线程数并行执行?
How to make spring batch step execution parallel with configurable thread count?
我有以下 spring-批量申请
SpringBatchApplication.java
package com.spbt.job.sample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
TraverseJob.java
package com.spbt.job.sample;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class TraverseJob {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
private String inputFolderPath = "/tmp/inputFolder";
@Bean("TraverseJob")
public Job job() {
return jobBuilderFactory.get("TraverseJob")
.incrementer(new RunIdIncrementer())
.start(traverseStep())
.build();
}
@Bean("TraverseStep")
public Step traverseStep() {
return stepBuilderFactory.get("TraverseStep")
.tasklet(traverseJobTasklet(null))
.build();
}
@Bean("TraverseJobTasklet")
@StepScope
public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
TraverseJobTasklet tasklet = new TraverseJobTasklet();
tasklet.setJobDate(date);
tasklet.setJobDirPath(inputFolderPath);
return tasklet;
}
}
TraverseJobTasklet.java
package com.spbt.job.sample;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
public class TraverseJobTasklet implements Tasklet {
private String jobDirPath;
private String jobDate;
@Autowired
private RemoteFilePush remoteFilePush;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
try {
traverseDir(new File(jobDirPath));
} catch (Exception ex) {
throw ex;
}
return RepeatStatus.FINISHED;
}
private void traverseDir(File filePath) throws Exception {
try {
File[] files = filePath.listFiles();
if (files != null) {
for (File file : files) {
String name = file.getName();
if (file.isDirectory()) {
if (remoteFilePush.isRemoteDirExist(name)) {
continue;
} else {
remoteFilePush.createRemoteDir(name);
traverseDir(file);
}
} else {
remoteFilePush.pushFile(file.getPath());
}
}
} else {
throw new Exception("empty/null dir -> " + filePath.getName());
}
} catch (Exception ex) {
throw ex;
}
}
public String getJobDirPath() {
return jobDirPath;
}
public void setJobDirPath(String jobDirPath) {
this.jobDirPath = jobDirPath;
}
public String getJobDate() {
return jobDate;
}
public void setJobDate(String jobDate) {
this.jobDate = jobDate;
}
}
RemoteFilePushLogic.java
package com.spbt.job.sample;
import org.springframework.stereotype.Component;
@Component
public class RemoteFilePush {
public boolean isRemoteDirExist(String name) throws InterruptedException {
boolean isRemoteDirExist = false;
// code to check dir on remote server
return isRemoteDirExist;
}
public void createRemoteDir(String name) throws InterruptedException {
// code to create dir on remote server
}
public void pushFile(String path) throws InterruptedException {
// code to push file on remote server
System.out.println("Pushed");
}
}
我想在 TraverseJobTasklet 的 traverseDir 方法中进行并行遍历和执行,通过保持我的 RemoteFilePush 逻辑完整,我的 inputFolderPath 可以包含多个子目录,每个子目录都包含一些文件。
我已尝试按照 link 获取我正在使用的 spring-批处理版本,但它是基于 xml 的,我似乎不知道如何创建多个步骤我有单次 traverseStep 吗?
input a sub-folder string path per worker step is where i am hitting wall with spring code, if you can point me to some ref. it will be helpful, most of the example on net is xml based.
这是一个快速 self-contained 示例 Java 配置:
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class PartitionJobSample {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public Step managerStep() {
return steps.get("masterStep")
.partitioner(workerStep().getName(), partitioner(null))
.step(workerStep())
.gridSize(4)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
}
@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
List<String> subFolders = getSubFolders(rootFolder);
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
for (String folder : subFolders) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("filePath", folder);
map.put("partition-for-" + folder, executionContext);
}
return map;
}
};
}
private List<String> getSubFolders(String rootFolder) {
// TODO implement this
return Arrays.asList("/data/folder1", "/data/folder2");
}
@Bean
public Step workerStep() {
return steps.get("workerStep")
.tasklet(getTasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
return new TraverseJobTasklet(filePath);
}
@Bean
public Job job() {
return jobs.get("job")
.start(managerStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("rootFolder", "/data")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
class TraverseJobTasklet implements Tasklet {
private String filePath;
public TraverseJobTasklet(String filePath) {
this.filePath = filePath;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// TODO call traversePath for filePath which is a sub-folder here
System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
return RepeatStatus.FINISHED;
}
}
}
它将根目录作为作业参数传递并执行分区步骤,其中每个工作进程处理一个 sub-folder(调用您的 tasklet)。
如果你 运行 它,你应该看到类似的东西:
SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2
我会让你根据自己的情况进行调整。
我有以下 spring-批量申请
SpringBatchApplication.java
package com.spbt.job.sample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
TraverseJob.java
package com.spbt.job.sample;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class TraverseJob {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
private String inputFolderPath = "/tmp/inputFolder";
@Bean("TraverseJob")
public Job job() {
return jobBuilderFactory.get("TraverseJob")
.incrementer(new RunIdIncrementer())
.start(traverseStep())
.build();
}
@Bean("TraverseStep")
public Step traverseStep() {
return stepBuilderFactory.get("TraverseStep")
.tasklet(traverseJobTasklet(null))
.build();
}
@Bean("TraverseJobTasklet")
@StepScope
public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
TraverseJobTasklet tasklet = new TraverseJobTasklet();
tasklet.setJobDate(date);
tasklet.setJobDirPath(inputFolderPath);
return tasklet;
}
}
TraverseJobTasklet.java
package com.spbt.job.sample;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
public class TraverseJobTasklet implements Tasklet {
private String jobDirPath;
private String jobDate;
@Autowired
private RemoteFilePush remoteFilePush;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
try {
traverseDir(new File(jobDirPath));
} catch (Exception ex) {
throw ex;
}
return RepeatStatus.FINISHED;
}
private void traverseDir(File filePath) throws Exception {
try {
File[] files = filePath.listFiles();
if (files != null) {
for (File file : files) {
String name = file.getName();
if (file.isDirectory()) {
if (remoteFilePush.isRemoteDirExist(name)) {
continue;
} else {
remoteFilePush.createRemoteDir(name);
traverseDir(file);
}
} else {
remoteFilePush.pushFile(file.getPath());
}
}
} else {
throw new Exception("empty/null dir -> " + filePath.getName());
}
} catch (Exception ex) {
throw ex;
}
}
public String getJobDirPath() {
return jobDirPath;
}
public void setJobDirPath(String jobDirPath) {
this.jobDirPath = jobDirPath;
}
public String getJobDate() {
return jobDate;
}
public void setJobDate(String jobDate) {
this.jobDate = jobDate;
}
}
RemoteFilePushLogic.java
package com.spbt.job.sample;
import org.springframework.stereotype.Component;
@Component
public class RemoteFilePush {
public boolean isRemoteDirExist(String name) throws InterruptedException {
boolean isRemoteDirExist = false;
// code to check dir on remote server
return isRemoteDirExist;
}
public void createRemoteDir(String name) throws InterruptedException {
// code to create dir on remote server
}
public void pushFile(String path) throws InterruptedException {
// code to push file on remote server
System.out.println("Pushed");
}
}
我想在 TraverseJobTasklet 的 traverseDir 方法中进行并行遍历和执行,通过保持我的 RemoteFilePush 逻辑完整,我的 inputFolderPath 可以包含多个子目录,每个子目录都包含一些文件。
我已尝试按照 link 获取我正在使用的 spring-批处理版本,但它是基于 xml 的,我似乎不知道如何创建多个步骤我有单次 traverseStep 吗?
input a sub-folder string path per worker step is where i am hitting wall with spring code, if you can point me to some ref. it will be helpful, most of the example on net is xml based.
这是一个快速 self-contained 示例 Java 配置:
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class PartitionJobSample {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public Step managerStep() {
return steps.get("masterStep")
.partitioner(workerStep().getName(), partitioner(null))
.step(workerStep())
.gridSize(4)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
}
@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
List<String> subFolders = getSubFolders(rootFolder);
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
for (String folder : subFolders) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("filePath", folder);
map.put("partition-for-" + folder, executionContext);
}
return map;
}
};
}
private List<String> getSubFolders(String rootFolder) {
// TODO implement this
return Arrays.asList("/data/folder1", "/data/folder2");
}
@Bean
public Step workerStep() {
return steps.get("workerStep")
.tasklet(getTasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
return new TraverseJobTasklet(filePath);
}
@Bean
public Job job() {
return jobs.get("job")
.start(managerStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("rootFolder", "/data")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
class TraverseJobTasklet implements Tasklet {
private String filePath;
public TraverseJobTasklet(String filePath) {
this.filePath = filePath;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// TODO call traversePath for filePath which is a sub-folder here
System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
return RepeatStatus.FINISHED;
}
}
}
它将根目录作为作业参数传递并执行分区步骤,其中每个工作进程处理一个 sub-folder(调用您的 tasklet)。
如果你 运行 它,你应该看到类似的东西:
SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2
我会让你根据自己的情况进行调整。