Spring 批处理 - 循环 reader/processor/writer 步骤
Spring Batch - Looping a reader/processor/writer step
回答
根据已接受的答案代码,对该代码进行的以下调整对我有用:
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
Flow[] flows = new Flow[steps.size()];
for (int i = 0; i < steps.size(); i++) {
flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
}
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows)
.build();
}
编辑
我已将问题更新为正确循环的版本,但随着应用程序的扩展,能够并行处理很重要,但我仍然不知道如何在运行时动态地使用 javaconfig 执行此操作。 ..
改进的问题:我如何在运行时动态创建一个 reader-processor-writer 用于 5 种不同的情况(5 次查询意味着 5 次循环,因为它现在配置)?
我的 LoopDecider 看起来像这样:
public class LoopDecider implements JobExecutionDecider {
private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED = "COMPLETED";
private static final String CONTINUE = "CONTINUE";
private static final String ALL = "queries";
private static final String COUNT = "count";
private int currentQuery;
private int limit;
@SuppressWarnings("unchecked")
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext().put(COUNT, currentQuery);
if (++currentQuery >= limit) {
return new FlowExecutionStatus(COMPLETED);
} else {
LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
return new FlowExecutionStatus(CONTINUE);
}
}
}
基于查询列表(HQL 查询),我想要一个 reader - 处理器 - 每个查询的编写器。我当前的配置如下所示:
工作
@Bean
public Job subsetJob() throws Exception {
LoopDecider loopDecider = new LoopDecider();
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(createHQL())
.next(extractData())
.next(loopDecider)
.on("CONTINUE")
.to(extractData())
.from(loopDecider)
.on("COMPLETED")
.end()
.build();
return jobBuilderFactory.get("subsetJob")
.start(flow)
.end()
.build();
}
步骤
public Step extractData(){
return stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Reader
public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
reader.setSessionFactory(HibernateUtil.getSessionFactory());
reader.setUseStatelessSession(false);
return reader;
}
处理器
public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
}
作家
public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
return writer;
}
目前该过程对于单个查询工作正常。但是,我实际上有一个查询列表。
我最初的想法是循环该步骤并将查询列表传递给该步骤,并为每个查询读取 - 处理 - 写入。这也是并行分块的理想选择。
但是,当我将查询列表作为参数添加到 extractData 步骤并为每个查询创建一个步骤时,将返回一个步骤列表,而不是预期的单个步骤。作业开始抱怨它需要一个步骤而不是一系列步骤。
另一个想法是创建一个自定义的 MultiHibernateCursorItemReader,其想法与 MultiItemResourceReader 相同,但是我真的在寻找一个更开箱即用的解决方案。
@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
List<Step> steps = new ArrayList<Step>();
for (String query : queries) {
steps.add(stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader(query))
.processor(processor())
.writer(writer(query))
.build());
}
return steps;
}
问题
如何循环步骤并将其集成到作业中?
不要将您的步骤、读取器、处理器和写入器实例化为 Spring-Beans。没有必要这样做。只有您的作业实例必须是 Spring Bean。
因此只需从您的步骤中删除@Bean 和@StepScope 配置,reader,编写器和处理器创建器方法,并在需要的地方实例化它们。
只有一个问题,您必须手动调用afterPropertiesSet()。例如:
// @Bean -> delete
// @StepScope -> delete
public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
FlatFileItemWriter writer = new FlatFileItemWriter();
writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
// ------- ADD!!
writer.afterPropertiesSet();
return writer;
}
这样,您的步骤 reader 编写器实例将自动 "step scoped",因为您为每个步骤显式实例化了它们。
如果我的回答不够清楚,请告诉我。然后我会添加一个更详细的例子。
编辑
一个简单的例子:
@Configuration
public class MyJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
List<String> filenames = Arrays.asList("file1.txt", "file2.txt");
@Bean
public Job myJob() {
List<Step> steps = filenames.stream().map(name -> createStep(filename));
return jobBuilderFactory.get("subsetJob")
.start(createParallelFlow(steps));
.end()
.build();
}
// helper method to create a step
private Step createStep(String filename) {
{
return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
.chunk(100_000)
.reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
.processor(new YourConversionProcessor());
.writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
.build();
}
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
List<Flow> flows = steps.stream() // we have to convert the steps to a flows
.map(step -> //
new FlowBuilder<Flow>("flow_" + step.getName()) //
.start(step) //
.build()) //
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
.add(flows.toArray(new Flow[flows.size()])) //
.build();
}
// helper methods to create filereader and filewriters
public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
FlatFileItemReader<T> reader = new FlatFileItemReader<>();
reader.setEncoding("UTF-8");
reader.setResource(source);
reader.setLineMapper(lineMapper);
reader.afterPropertiesSet();
return reader;
}
public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();
writer.setEncoding("UTF-8");
writer.setResource(target);
writer.setLineAggregator(aggregator);
writer.afterPropertiesSet();
return writer;
}
}
回答
根据已接受的答案代码,对该代码进行的以下调整对我有用:
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
Flow[] flows = new Flow[steps.size()];
for (int i = 0; i < steps.size(); i++) {
flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
}
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows)
.build();
}
编辑
我已将问题更新为正确循环的版本,但随着应用程序的扩展,能够并行处理很重要,但我仍然不知道如何在运行时动态地使用 javaconfig 执行此操作。 ..
改进的问题:我如何在运行时动态创建一个 reader-processor-writer 用于 5 种不同的情况(5 次查询意味着 5 次循环,因为它现在配置)?
我的 LoopDecider 看起来像这样:
public class LoopDecider implements JobExecutionDecider {
private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED = "COMPLETED";
private static final String CONTINUE = "CONTINUE";
private static final String ALL = "queries";
private static final String COUNT = "count";
private int currentQuery;
private int limit;
@SuppressWarnings("unchecked")
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext().put(COUNT, currentQuery);
if (++currentQuery >= limit) {
return new FlowExecutionStatus(COMPLETED);
} else {
LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
return new FlowExecutionStatus(CONTINUE);
}
}
}
基于查询列表(HQL 查询),我想要一个 reader - 处理器 - 每个查询的编写器。我当前的配置如下所示:
工作
@Bean
public Job subsetJob() throws Exception {
LoopDecider loopDecider = new LoopDecider();
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(createHQL())
.next(extractData())
.next(loopDecider)
.on("CONTINUE")
.to(extractData())
.from(loopDecider)
.on("COMPLETED")
.end()
.build();
return jobBuilderFactory.get("subsetJob")
.start(flow)
.end()
.build();
}
步骤
public Step extractData(){
return stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Reader
public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
reader.setSessionFactory(HibernateUtil.getSessionFactory());
reader.setUseStatelessSession(false);
return reader;
}
处理器
public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
}
作家
public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
return writer;
}
目前该过程对于单个查询工作正常。但是,我实际上有一个查询列表。
我最初的想法是循环该步骤并将查询列表传递给该步骤,并为每个查询读取 - 处理 - 写入。这也是并行分块的理想选择。
但是,当我将查询列表作为参数添加到 extractData 步骤并为每个查询创建一个步骤时,将返回一个步骤列表,而不是预期的单个步骤。作业开始抱怨它需要一个步骤而不是一系列步骤。
另一个想法是创建一个自定义的 MultiHibernateCursorItemReader,其想法与 MultiItemResourceReader 相同,但是我真的在寻找一个更开箱即用的解决方案。
@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
List<Step> steps = new ArrayList<Step>();
for (String query : queries) {
steps.add(stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader(query))
.processor(processor())
.writer(writer(query))
.build());
}
return steps;
}
问题
如何循环步骤并将其集成到作业中?
不要将您的步骤、读取器、处理器和写入器实例化为 Spring-Beans。没有必要这样做。只有您的作业实例必须是 Spring Bean。
因此只需从您的步骤中删除@Bean 和@StepScope 配置,reader,编写器和处理器创建器方法,并在需要的地方实例化它们。
只有一个问题,您必须手动调用afterPropertiesSet()。例如:
// @Bean -> delete
// @StepScope -> delete
public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
FlatFileItemWriter writer = new FlatFileItemWriter();
writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
// ------- ADD!!
writer.afterPropertiesSet();
return writer;
}
这样,您的步骤 reader 编写器实例将自动 "step scoped",因为您为每个步骤显式实例化了它们。
如果我的回答不够清楚,请告诉我。然后我会添加一个更详细的例子。
编辑
一个简单的例子:
@Configuration
public class MyJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
List<String> filenames = Arrays.asList("file1.txt", "file2.txt");
@Bean
public Job myJob() {
List<Step> steps = filenames.stream().map(name -> createStep(filename));
return jobBuilderFactory.get("subsetJob")
.start(createParallelFlow(steps));
.end()
.build();
}
// helper method to create a step
private Step createStep(String filename) {
{
return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
.chunk(100_000)
.reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
.processor(new YourConversionProcessor());
.writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
.build();
}
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
List<Flow> flows = steps.stream() // we have to convert the steps to a flows
.map(step -> //
new FlowBuilder<Flow>("flow_" + step.getName()) //
.start(step) //
.build()) //
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
.add(flows.toArray(new Flow[flows.size()])) //
.build();
}
// helper methods to create filereader and filewriters
public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
FlatFileItemReader<T> reader = new FlatFileItemReader<>();
reader.setEncoding("UTF-8");
reader.setResource(source);
reader.setLineMapper(lineMapper);
reader.afterPropertiesSet();
return reader;
}
public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();
writer.setEncoding("UTF-8");
writer.setResource(target);
writer.setLineAggregator(aggregator);
writer.afterPropertiesSet();
return writer;
}
}