Spring Batch 在步骤之间共享大量数据
SpringBatch Sharing Large Amounts of Data Between Steps
我需要在 spring 批处理实施的作业步骤之间共享相对大量的数据。我知道 StepExecutionContext
和 JobExecutionContext
作为这种机制。但是,我读到,因为这些必须限制大小(少于 2500 个字符)。这对我的需要来说太小了。在我的新手一步 Spring Batch
实现中,我的一步工作如下:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
private static final String GET_DATA =
" SELECT " +
"stuffA, " +
"stuffB, " +
"FROM STUFF_TABLE " +
"ORDER BY stuffA ASC";
@Bean
public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<StuffDto>()
.name("cursorItemReader")
.dataSource(dataSource)
.sql(GET_DATA)
.rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
.build();
}
@Bean
ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
return new QueryLoggingProcessor();
}
@Bean
public ItemWriter<StuffDto> databaseCursorItemWriter() {
return new LoggingItemWriter();
}
@Bean
public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
@Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("databaseCursorStep")
.<StuffDto, StuffDto>chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("databaseCursorJob")
.incrementer(new RunIdIncrementer())
.flow(exampleJobStep)
.end()
.build();
}
}
从我可以成功地从数据库读取并在编写器步骤中写入 loggingitemwriter 的意义上说,这工作得很好:
public class LoggingItemWriter implements ItemWriter<StuffDto> {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);
@Override
public void write(List<? extends StuffDto> list) throws Exception {
LOGGER.info("Writing stuff: {}", list);
}
}
但是,我需要能够提供 StuffDto
(或等同物),并且它是第二步的数据,该步骤将对其执行一些处理,而不仅仅是记录它。
如果您假设步骤和工作环境太有限,我将不胜感激关于如何实现这一点的任何想法。谢谢。
如果您不想将数据写入数据库或文件系统,一种实现相同的方法如下:
- 在具有所需属性的
config
class 中创建您自己的作业上下文 bean,并用 @JobScope
. 对其进行注释
- 为您的 reader、处理器和编写器 class 实现
org.springframework.batch.core.step.tasklet
接口。如果你想更好地控制步骤,你也可以用它实现 org.springframework.batch.core.StepExecutionListener
。
- 使用
@Autowire
获取您自己的 context
对象,并使用它的 setter-getter 方法来存储和检索数据。
示例代码:
Config.java
@Autowired
private Processor processor;
@Autowired
private Reader reader;
@Autowired
private Writer writer;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@JobScope
public JobContext getJobContexts() {
JobContext newJobContext = new JobContext();
return newJobContext;
}
@Bean
public Step reader() {
return stepBuilderFactory.get("reader")
.tasklet(reader)
.build();
}
@Bean
public Step processor() {
return stepBuilderFactory.get("processor")
.tasklet(processor)
.build();
}
@Bean
public Step writer() {
return stepBuilderFactory.get("writer")
.tasklet(writer)
.build();
}
public Job testJob() {
return jobBuilderFactory.get("testJob")
.start(reader())
.next(processor())
.next(writer())
.build();
}
//Below will start the job
@Scheduled(fixedRate = 1000)
public void starJob(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(new Date()));
JobParameters jobParameters = new JobParameters(confMap);
monitorJobLauncher.run(testJob(), jobParameters);
}
JobContext.java
private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();
//getter
SampleReader.java
@Component
public class SampleReader implements Tasklet,StepExecutionListener{
@Autowired
private JobContext context;
@Override
public void beforeStep(StepExecution stepExecution) {
//logic that you need to perform before the execution of this step.
}
@Override
public void afterStep(StepExecution stepExecution) {
//logic that you need to perform after the execution of this step.
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for reader.
// Fetch StuffDto object from database and add it to jobContext
//dataToProcess list.
return RepeatStatus.FINISHED;
}
}
SampleProcessor.java
@Component
public class SampleProcessor implements Tasklet{
@Autowired
private JobContext context;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for processor.
// context.getDataToProcessList();
// apply business logic and set the data to write.
return RepeatStatus.FINISHED;
}
writer
class.
的方法相同
注意:这里请注意,这里需要自己编写database-related样板代码。但是通过这种方式,您可以更好地控制您的逻辑,而不必担心上下文大小限制。所有数据都将在内存中,因此一旦操作完成,这些数据就会被垃圾收集。我希望你明白我愿意传达的意思。
要了解更多关于 Tasklet
与 Chunk
的信息,请阅读 this。
我需要在 spring 批处理实施的作业步骤之间共享相对大量的数据。我知道 StepExecutionContext
和 JobExecutionContext
作为这种机制。但是,我读到,因为这些必须限制大小(少于 2500 个字符)。这对我的需要来说太小了。在我的新手一步 Spring Batch
实现中,我的一步工作如下:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
private static final String GET_DATA =
" SELECT " +
"stuffA, " +
"stuffB, " +
"FROM STUFF_TABLE " +
"ORDER BY stuffA ASC";
@Bean
public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<StuffDto>()
.name("cursorItemReader")
.dataSource(dataSource)
.sql(GET_DATA)
.rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
.build();
}
@Bean
ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
return new QueryLoggingProcessor();
}
@Bean
public ItemWriter<StuffDto> databaseCursorItemWriter() {
return new LoggingItemWriter();
}
@Bean
public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
@Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("databaseCursorStep")
.<StuffDto, StuffDto>chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("databaseCursorJob")
.incrementer(new RunIdIncrementer())
.flow(exampleJobStep)
.end()
.build();
}
}
从我可以成功地从数据库读取并在编写器步骤中写入 loggingitemwriter 的意义上说,这工作得很好:
public class LoggingItemWriter implements ItemWriter<StuffDto> {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);
@Override
public void write(List<? extends StuffDto> list) throws Exception {
LOGGER.info("Writing stuff: {}", list);
}
}
但是,我需要能够提供 StuffDto
(或等同物),并且它是第二步的数据,该步骤将对其执行一些处理,而不仅仅是记录它。
如果您假设步骤和工作环境太有限,我将不胜感激关于如何实现这一点的任何想法。谢谢。
如果您不想将数据写入数据库或文件系统,一种实现相同的方法如下:
- 在具有所需属性的
config
class 中创建您自己的作业上下文 bean,并用@JobScope
. 对其进行注释
- 为您的 reader、处理器和编写器 class 实现
org.springframework.batch.core.step.tasklet
接口。如果你想更好地控制步骤,你也可以用它实现org.springframework.batch.core.StepExecutionListener
。 - 使用
@Autowire
获取您自己的context
对象,并使用它的 setter-getter 方法来存储和检索数据。
示例代码:
Config.java
@Autowired
private Processor processor;
@Autowired
private Reader reader;
@Autowired
private Writer writer;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@JobScope
public JobContext getJobContexts() {
JobContext newJobContext = new JobContext();
return newJobContext;
}
@Bean
public Step reader() {
return stepBuilderFactory.get("reader")
.tasklet(reader)
.build();
}
@Bean
public Step processor() {
return stepBuilderFactory.get("processor")
.tasklet(processor)
.build();
}
@Bean
public Step writer() {
return stepBuilderFactory.get("writer")
.tasklet(writer)
.build();
}
public Job testJob() {
return jobBuilderFactory.get("testJob")
.start(reader())
.next(processor())
.next(writer())
.build();
}
//Below will start the job
@Scheduled(fixedRate = 1000)
public void starJob(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(new Date()));
JobParameters jobParameters = new JobParameters(confMap);
monitorJobLauncher.run(testJob(), jobParameters);
}
JobContext.java
private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();
//getter
SampleReader.java
@Component
public class SampleReader implements Tasklet,StepExecutionListener{
@Autowired
private JobContext context;
@Override
public void beforeStep(StepExecution stepExecution) {
//logic that you need to perform before the execution of this step.
}
@Override
public void afterStep(StepExecution stepExecution) {
//logic that you need to perform after the execution of this step.
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for reader.
// Fetch StuffDto object from database and add it to jobContext
//dataToProcess list.
return RepeatStatus.FINISHED;
}
}
SampleProcessor.java
@Component
public class SampleProcessor implements Tasklet{
@Autowired
private JobContext context;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for processor.
// context.getDataToProcessList();
// apply business logic and set the data to write.
return RepeatStatus.FINISHED;
}
writer
class.
注意:这里请注意,这里需要自己编写database-related样板代码。但是通过这种方式,您可以更好地控制您的逻辑,而不必担心上下文大小限制。所有数据都将在内存中,因此一旦操作完成,这些数据就会被垃圾收集。我希望你明白我愿意传达的意思。
要了解更多关于 Tasklet
与 Chunk
的信息,请阅读 this。