Spring 在 tasklet 中批量执行动态生成的步骤
Spring batch execute dynamically generated steps in a tasklet
我有一个 spring 批处理作业,可以执行以下操作...
第 1 步。创建需要处理的对象列表
步骤 2。根据步骤 1 中创建的对象列表中的项目数创建步骤列表。
第 3 步。尝试执行第 2 步中创建的步骤列表中的步骤。
执行 x 个步骤在下面的 executeDynamicStepsTasklet() 中完成。虽然代码运行没有任何错误,但它似乎没有做任何事情。我在那个方法中的内容看起来正确吗?
谢谢
/*
*
*/
@Configuration
public class ExportMasterListCsvJobConfig {
public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}")
public int chunkSize;
@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}")
public String masterListSql;
@Autowired
public DataSource onlineStagingDb;
@Value("${out.dir}")
public String outDir;
@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}")
private String promoStartDateEndDateSql;
private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping;
private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ;
@Bean
public Job exportMasterListCsvJob(
@Qualifier("createJobDatesStep") Step createJobDatesStep,
@Qualifier("createDynamicStepsStep") Step createDynamicStepsStep,
@Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) {
return jobBuilderFactory.get(JOB_NAME)
.flow(createJobDatesStep)
.next(createDynamicStepsStep)
.next(executeDynamicStepsStep)
.end().build();
}
@Bean
public Step executeDynamicStepsStep(
@Qualifier("executeDynamicStepsTasklet") Tasklet executeDynamicStepsTasklet) {
return stepBuilderFactory
.get("executeDynamicStepsStep")
.tasklet(executeDynamicStepsTasklet)
.build();
}
@Bean
public Tasklet executeDynamicStepsTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
FlowStep flowStep = new FlowStep(createParallelFlow());
SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep);
return RepeatStatus.FINISHED;
}
};
}
public Flow createParallelFlow() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
List<Flow> flows = dynamicSteps.stream()
.map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}
@Bean
public Step createDynamicStepsStep(
@Qualifier("createDynamicStepsTasklet") Tasklet createDynamicStepsTasklet) {
return stepBuilderFactory
.get("createDynamicStepsStep")
.tasklet(createDynamicStepsTasklet)
.build();
}
@Bean
@JobScope
public Tasklet createDynamicStepsTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){
System.err.println("grp: " + grp);
String stepName = "stp_" + grp;
String fileName = grp + FlatFileConstants.EXTENSION_CSV;
Step dynamicStep =
stepBuilderFactory.get(stepName)
.<MasterList,MasterList> chunk(10)
.reader(queryStagingDbReader(
grp.getDivisionId(),
grp.getRpmPromoCompDetailStartDate(),
grp.getRpmPromoCompDetailEndDate()))
.writer(masterListFileWriter(fileName))
.build();
dynamicSteps.add(dynamicStep);
}
System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps);
return RepeatStatus.FINISHED;
}
};
}
public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) {
FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(new File(outDir, fileName )));
writer.setHeaderCallback(masterListFlatFileHeaderCallback());
writer.setLineAggregator(masterListFormatterLineAggregator());
return writer;
}
所以现在我有一个需要执行的动态步骤列表,我相信它们在 StepScope 中。有人可以告诉我如何执行它们吗
这行不通。您的 Tasklet 只是创建一个以 FlowStep 作为第一步的作业。使用 jobBuilderfactory 只是创建工作。它不会启动它。方法名 "start" 可能会产生误导,因为它只定义了第一步。但它不会启动作业。
作业一旦启动就无法更改其结构(其步骤和子步骤)。因此,不可能根据步骤 1 中计算的内容在步骤 2 中配置流程步骤。(当然,您可以在 springbatch 结构中进行更深入的修改并直接修改 bean 等等......但你不我不想那样做)。
我建议您使用一种 "SetupBean" 和适当的 postConstruct 方法,该方法被注入到配置您的作业的 class 中。这个"SetupBean"负责计算正在处理的对象列表。
@Component
public class SetUpBean {
private List<Object> myObjects;
@PostConstruct
public afterPropertiesSet() {
myObjects = ...;
}
public List<Object> getMyObjects() {
return myObjects;
}
}
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SetUpBean setup;
...
}
我有一个 spring 批处理作业,可以执行以下操作...
第 1 步。创建需要处理的对象列表
步骤 2。根据步骤 1 中创建的对象列表中的项目数创建步骤列表。
第 3 步。尝试执行第 2 步中创建的步骤列表中的步骤。
执行 x 个步骤在下面的 executeDynamicStepsTasklet() 中完成。虽然代码运行没有任何错误,但它似乎没有做任何事情。我在那个方法中的内容看起来正确吗?
谢谢
/* * */
@Configuration
public class ExportMasterListCsvJobConfig {
public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}")
public int chunkSize;
@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}")
public String masterListSql;
@Autowired
public DataSource onlineStagingDb;
@Value("${out.dir}")
public String outDir;
@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}")
private String promoStartDateEndDateSql;
private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping;
private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ;
@Bean
public Job exportMasterListCsvJob(
@Qualifier("createJobDatesStep") Step createJobDatesStep,
@Qualifier("createDynamicStepsStep") Step createDynamicStepsStep,
@Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) {
return jobBuilderFactory.get(JOB_NAME)
.flow(createJobDatesStep)
.next(createDynamicStepsStep)
.next(executeDynamicStepsStep)
.end().build();
}
@Bean
public Step executeDynamicStepsStep(
@Qualifier("executeDynamicStepsTasklet") Tasklet executeDynamicStepsTasklet) {
return stepBuilderFactory
.get("executeDynamicStepsStep")
.tasklet(executeDynamicStepsTasklet)
.build();
}
@Bean
public Tasklet executeDynamicStepsTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
FlowStep flowStep = new FlowStep(createParallelFlow());
SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep);
return RepeatStatus.FINISHED;
}
};
}
public Flow createParallelFlow() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
List<Flow> flows = dynamicSteps.stream()
.map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}
@Bean
public Step createDynamicStepsStep(
@Qualifier("createDynamicStepsTasklet") Tasklet createDynamicStepsTasklet) {
return stepBuilderFactory
.get("createDynamicStepsStep")
.tasklet(createDynamicStepsTasklet)
.build();
}
@Bean
@JobScope
public Tasklet createDynamicStepsTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){
System.err.println("grp: " + grp);
String stepName = "stp_" + grp;
String fileName = grp + FlatFileConstants.EXTENSION_CSV;
Step dynamicStep =
stepBuilderFactory.get(stepName)
.<MasterList,MasterList> chunk(10)
.reader(queryStagingDbReader(
grp.getDivisionId(),
grp.getRpmPromoCompDetailStartDate(),
grp.getRpmPromoCompDetailEndDate()))
.writer(masterListFileWriter(fileName))
.build();
dynamicSteps.add(dynamicStep);
}
System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps);
return RepeatStatus.FINISHED;
}
};
}
public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) {
FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(new File(outDir, fileName )));
writer.setHeaderCallback(masterListFlatFileHeaderCallback());
writer.setLineAggregator(masterListFormatterLineAggregator());
return writer;
}
所以现在我有一个需要执行的动态步骤列表,我相信它们在 StepScope 中。有人可以告诉我如何执行它们吗
这行不通。您的 Tasklet 只是创建一个以 FlowStep 作为第一步的作业。使用 jobBuilderfactory 只是创建工作。它不会启动它。方法名 "start" 可能会产生误导,因为它只定义了第一步。但它不会启动作业。
作业一旦启动就无法更改其结构(其步骤和子步骤)。因此,不可能根据步骤 1 中计算的内容在步骤 2 中配置流程步骤。(当然,您可以在 springbatch 结构中进行更深入的修改并直接修改 bean 等等......但你不我不想那样做)。
我建议您使用一种 "SetupBean" 和适当的 postConstruct 方法,该方法被注入到配置您的作业的 class 中。这个"SetupBean"负责计算正在处理的对象列表。
@Component
public class SetUpBean {
private List<Object> myObjects;
@PostConstruct
public afterPropertiesSet() {
myObjects = ...;
}
public List<Object> getMyObjects() {
return myObjects;
}
}
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SetUpBean setup;
...
}