如何在 Spring 批处理中每个分区步骤完成后将 tasklet 添加到 运行

How to add tasklet to run after each partition step completion in Spring Batch

我是 Spring 批处理的新手,正在实施一个 spring 批处理作业,它必须从数据库中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。

@Bean
public Job customDBReaderFileWriterJob() throws Exception {
    return jobBuilderFactory.get(MY_JOB)
            .incrementer(new RunIdIncrementer())
            .flow(partitionGenerationStep())
            .next(cleanupStep())
            .end()
            .build();
}

@Bean
public Step partitionGenerationStep() throws Exception {
    return stepBuilderFactory
            .get("partitionGenerationStep")
            .partitioner("Partitioner", partitioner())
            .step(multiOperationStep())
            .gridSize(50)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step multiOperationStep() throws Exception {
    return stepBuilderFactory
            .get("MultiOperationStep")
            .<Input, Output>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public DBPartitioner partitioner() {
    DBPartitioner dbPartitioner = new DBPartitioner();
    dbPartitioner.setColumn(ID);
    dbPartitioner.setDataSource(dataSource);
    dbPartitioner.setTable(TABLE);
    return dbPartitioner;
}

@Bean
@StepScope
public Reader reader() {
    return new Reader();
}

@Bean
@StepScope
public Processor processor() {
    return new Processor();
}

@Bean
@StepScope
public Writer writer() {
    return new Writer();
}    

@Bean
public Step cleanupStep() {
    return stepBuilderFactory.get("cleanupStep")
            .tasklet(cleanupTasklet())
            .build();
}

@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
    return new CleanupTasklet();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-");
    return executor;
}    

由于数据集很大,我将任务执行器的线程池值配置为 10,网格大小为 50。使用此设置,10 个线程一次写入 10 个文件,reader 是分块读取文件,因此 reader 处理器和编写器流程迭代多次(对于一组 10,在移动到下一个分区之前)。

现在,我想添加一个 tasklet,我可以在完成一个线程的所有迭代(读取、处理、写入)后压缩文件,即在每个分区完成后。

我最后确实有一个清理 tasklet 到 运行,但是那里有压缩逻辑意味着首先从每个分区获取所有生成的文件,然后执行压缩。请提出建议。

您可以将工作步骤 multiOperationStep 更改为 FlowStep 面向块的步骤,然后是执行压缩的简单 tasklet 步骤。也就是说,worker step其实就是两个step合二为一FlowStep.