Spring Batch 在步骤之间共享大量数据

SpringBatch Sharing Large Amounts of Data Between Steps

我需要在 spring 批处理实施的作业步骤之间共享相对大量的数据。我知道 StepExecutionContextJobExecutionContext 作为这种机制。但是,我读到,因为这些必须限制大小(少于 2500 个字符)。这对我的需要来说太小了。在我的新手一步 Spring Batch 实现中,我的一步工作如下:

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;


    private static final String GET_DATA =
    "    SELECT " +
    "stuffA, " +
    "stuffB, " +
    "FROM STUFF_TABLE " +
    "ORDER BY stuffA ASC";

    @Bean
    public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<StuffDto>()
                .name("cursorItemReader")
            .dataSource(dataSource)
            .sql(GET_DATA)
            .rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
                .build();
    }

    @Bean
    ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
        return new QueryLoggingProcessor();
    }

    @Bean
    public ItemWriter<StuffDto> databaseCursorItemWriter() {
        return new LoggingItemWriter();
    }

    @Bean
    public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
    @Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
    StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("databaseCursorStep")
            .<StuffDto, StuffDto>chunk(1)
        .reader(reader)
            .writer(writer)
            .build();
    }

    @Bean
    public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
    JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("databaseCursorJob")
            .incrementer(new RunIdIncrementer())
            .flow(exampleJobStep)
            .end()
            .build();
    }
}

从我可以成功地从数据库读取并在编写器步骤中写入 loggingitemwriter 的意义上说,这工作得很好:

public class LoggingItemWriter implements ItemWriter<StuffDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);

    @Override
    public void write(List<? extends StuffDto> list) throws Exception {
        LOGGER.info("Writing stuff: {}", list);
    }
}

但是,我需要能够提供 StuffDto(或等同物),并且它是第二步的数据,该步骤将对其执行一些处理,而不仅仅是记录它。

如果您假设步骤和工作环境太有限,我将不胜感激关于如何实现这一点的任何想法。谢谢。

如果您不想将数据写入数据库或文件系统,一种实现相同的方法如下:

  1. 在具有所需属性的 config class 中创建您自己的作业上下文 bean,并用 @JobScope.
  2. 对其进行注释
  3. 为您的 reader、处理器和编写器 class 实现 org.springframework.batch.core.step.tasklet 接口。如果你想更好地控制步骤,你也可以用它实现 org.springframework.batch.core.StepExecutionListener
  4. 使用 @Autowire 获取您自己的 context 对象,并使用它的 setter-getter 方法来存储和检索数据。

示例代码:

Config.java

@Autowired
private Processor processor;

@Autowired
private Reader reader;

@Autowired
private Writer writer;

@Autowired
private JobBuilderFactory jobBuilderFactory;
     
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
@JobScope
public JobContext getJobContexts() {
    JobContext newJobContext = new JobContext();
    return newJobContext;
}

@Bean
public Step reader() {
      return stepBuilderFactory.get("reader")
        .tasklet(reader)
        .build();
     }

@Bean
public Step processor() {
      return stepBuilderFactory.get("processor")
        .tasklet(processor)
        .build();
     }

@Bean
public Step writer() {
      return stepBuilderFactory.get("writer")
        .tasklet(writer)
        .build();
     }


public Job testJob() {
      
      return jobBuilderFactory.get("testJob")
        .start(reader())
        .next(processor())
        .next(writer())
        .build();
     }

//Below will start the job
@Scheduled(fixedRate = 1000)
    public void starJob(){
        
        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("time", new JobParameter(new Date()));
        JobParameters jobParameters = new JobParameters(confMap);
        monitorJobLauncher.run(testJob(), jobParameters);
        
}

JobContext.java

private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();

//getter

SampleReader.java

@Component
public class SampleReader  implements Tasklet,StepExecutionListener{
        @Autowired
        private JobContext context;
       
      @Override
      public void beforeStep(StepExecution stepExecution) {
    //logic that you need to perform before the execution of this step.
      }
    
    @Override
      public void afterStep(StepExecution stepExecution) {
    //logic that you need to perform after the execution of this step.
      }
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
     // Whatever code is here will get executed for reader.
     //  Fetch StuffDto object from database and add it to jobContext 
   //dataToProcess list.
    return RepeatStatus.FINISHED;
    }
}

SampleProcessor.java

   @Component
   public class SampleProcessor  implements Tasklet{
    
       @Autowired
       private JobContext context;
    
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
        
         // Whatever code is here will get executed for processor.
        // context.getDataToProcessList();
       // apply business logic and set the data to write.

return RepeatStatus.FINISHED;
    }

writer class.

的方法相同

注意:这里请注意,这里需要自己编写database-related样板代码。但是通过这种方式,您可以更好地控制您的逻辑,而不必担心上下文大小限制。所有数据都将在内存中,因此一旦操作完成,这些数据就会被垃圾收集。我希望你明白我愿意传达的意思。

要了解更多关于 TaskletChunk 的信息,请阅读 this