如何在 Spring 批处理中每个分区步骤完成后将 tasklet 添加到 运行
How to add tasklet to run after each partition step completion in Spring Batch
我是 Spring 批处理的新手,正在实施一个 spring 批处理作业,它必须从数据库中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。
@Bean
public Job customDBReaderFileWriterJob() throws Exception {
return jobBuilderFactory.get(MY_JOB)
.incrementer(new RunIdIncrementer())
.flow(partitionGenerationStep())
.next(cleanupStep())
.end()
.build();
}
@Bean
public Step partitionGenerationStep() throws Exception {
return stepBuilderFactory
.get("partitionGenerationStep")
.partitioner("Partitioner", partitioner())
.step(multiOperationStep())
.gridSize(50)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step multiOperationStep() throws Exception {
return stepBuilderFactory
.get("MultiOperationStep")
.<Input, Output>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public DBPartitioner partitioner() {
DBPartitioner dbPartitioner = new DBPartitioner();
dbPartitioner.setColumn(ID);
dbPartitioner.setDataSource(dataSource);
dbPartitioner.setTable(TABLE);
return dbPartitioner;
}
@Bean
@StepScope
public Reader reader() {
return new Reader();
}
@Bean
@StepScope
public Processor processor() {
return new Processor();
}
@Bean
@StepScope
public Writer writer() {
return new Writer();
}
@Bean
public Step cleanupStep() {
return stepBuilderFactory.get("cleanupStep")
.tasklet(cleanupTasklet())
.build();
}
@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
return new CleanupTasklet();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
由于数据集很大,我将任务执行器的线程池值配置为 10,网格大小为 50。使用此设置,10 个线程一次写入 10 个文件,reader 是分块读取文件,因此 reader 处理器和编写器流程迭代多次(对于一组 10,在移动到下一个分区之前)。
现在,我想添加一个 tasklet,我可以在完成一个线程的所有迭代(读取、处理、写入)后压缩文件,即在每个分区完成后。
我最后确实有一个清理 tasklet 到 运行,但是那里有压缩逻辑意味着首先从每个分区获取所有生成的文件,然后执行压缩。请提出建议。
您可以将工作步骤 multiOperationStep
更改为 FlowStep
面向块的步骤,然后是执行压缩的简单 tasklet 步骤。也就是说,worker step其实就是两个step合二为一FlowStep
.
我是 Spring 批处理的新手,正在实施一个 spring 批处理作业,它必须从数据库中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。
@Bean
public Job customDBReaderFileWriterJob() throws Exception {
return jobBuilderFactory.get(MY_JOB)
.incrementer(new RunIdIncrementer())
.flow(partitionGenerationStep())
.next(cleanupStep())
.end()
.build();
}
@Bean
public Step partitionGenerationStep() throws Exception {
return stepBuilderFactory
.get("partitionGenerationStep")
.partitioner("Partitioner", partitioner())
.step(multiOperationStep())
.gridSize(50)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step multiOperationStep() throws Exception {
return stepBuilderFactory
.get("MultiOperationStep")
.<Input, Output>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public DBPartitioner partitioner() {
DBPartitioner dbPartitioner = new DBPartitioner();
dbPartitioner.setColumn(ID);
dbPartitioner.setDataSource(dataSource);
dbPartitioner.setTable(TABLE);
return dbPartitioner;
}
@Bean
@StepScope
public Reader reader() {
return new Reader();
}
@Bean
@StepScope
public Processor processor() {
return new Processor();
}
@Bean
@StepScope
public Writer writer() {
return new Writer();
}
@Bean
public Step cleanupStep() {
return stepBuilderFactory.get("cleanupStep")
.tasklet(cleanupTasklet())
.build();
}
@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
return new CleanupTasklet();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
由于数据集很大,我将任务执行器的线程池值配置为 10,网格大小为 50。使用此设置,10 个线程一次写入 10 个文件,reader 是分块读取文件,因此 reader 处理器和编写器流程迭代多次(对于一组 10,在移动到下一个分区之前)。
现在,我想添加一个 tasklet,我可以在完成一个线程的所有迭代(读取、处理、写入)后压缩文件,即在每个分区完成后。
我最后确实有一个清理 tasklet 到 运行,但是那里有压缩逻辑意味着首先从每个分区获取所有生成的文件,然后执行压缩。请提出建议。
您可以将工作步骤 multiOperationStep
更改为 FlowStep
面向块的步骤,然后是执行压缩的简单 tasklet 步骤。也就是说,worker step其实就是两个step合二为一FlowStep
.