如何为我的 Spring 批处理项目中的每个步骤创建自定义 ItemReader
How do I create custom ItemReader for each step in my Spring Batch project
我正在尝试在每个步骤中使用自定义 reader、处理器和编写器:
public Step step1(StepBuilderFactory factory,
ItemReader reader,
ExpireAssessmentWriter writer,
AssessmentItemProcessor processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step1")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
//update aggregate balance table
@Bean
public Step step2(StepBuilderFactory factory,
ItemReader reader,
BalanceItemWriter writer,
BalanceProcessor processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step2")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Step step3(StepBuilderFactory factory,
ItemReader<Assessment> reader,
CustomWriter3 writer,
CustomItemProcessor3 processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step3")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
第一步工作正常,但只有当我把这个 reader 留在同一个 class:
private static final String READER_QUERY = "SELECT * FROM TABLE1 WHERE COLUMN='TEST'";
@Bean
public JdbcCursorItemReader<Assessment> reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader")
.sql(READER_QUERY)
.rowMapper(new AssessmentMapper())
.build();
}
我如何为每个将读取其自己的查询的步骤创建自定义 reader?
我可以创建一个扩展 JdbcCursorItemReader 的自定义 reader
returns 同样的代码片段? :
@Bean
public JdbcCursorItemReader<Assessment> reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader")
.sql(READER_QUERY)
.rowMapper(new AssessmentMapper())
.build();
}
```
由于所有步骤的项目类型都相同,您可以创建一个接受查询的方法和 returns 一个项目 reader:
public JdbcCursorItemReader<Assessment> getReader(DataSource dataSource, String query) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader") // can be passed as a parameter as well
.sql(query)
.rowMapper(new AssessmentMapper())
.build();
}
然后在每个步骤定义中调用此方法,并为每个步骤传递所需的查询。
我正在尝试在每个步骤中使用自定义 reader、处理器和编写器:
public Step step1(StepBuilderFactory factory,
ItemReader reader,
ExpireAssessmentWriter writer,
AssessmentItemProcessor processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step1")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
//update aggregate balance table
@Bean
public Step step2(StepBuilderFactory factory,
ItemReader reader,
BalanceItemWriter writer,
BalanceProcessor processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step2")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Step step3(StepBuilderFactory factory,
ItemReader<Assessment> reader,
CustomWriter3 writer,
CustomItemProcessor3 processor,
PlatformTransactionManager platformTransactionManager){
return stepBuilderFactory.get("step3")
.transactionManager(platformTransactionManager)
.<Assessment,Assessment>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
第一步工作正常,但只有当我把这个 reader 留在同一个 class:
private static final String READER_QUERY = "SELECT * FROM TABLE1 WHERE COLUMN='TEST'";
@Bean
public JdbcCursorItemReader<Assessment> reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader")
.sql(READER_QUERY)
.rowMapper(new AssessmentMapper())
.build();
}
我如何为每个将读取其自己的查询的步骤创建自定义 reader?
我可以创建一个扩展 JdbcCursorItemReader 的自定义 reader returns 同样的代码片段? :
@Bean
public JdbcCursorItemReader<Assessment> reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader")
.sql(READER_QUERY)
.rowMapper(new AssessmentMapper())
.build();
}
```
由于所有步骤的项目类型都相同,您可以创建一个接受查询的方法和 returns 一个项目 reader:
public JdbcCursorItemReader<Assessment> getReader(DataSource dataSource, String query) {
return new JdbcCursorItemReaderBuilder<Assessment>()
.dataSource(dataSource)
.name("AssessmentUtilityReader") // can be passed as a parameter as well
.sql(query)
.rowMapper(new AssessmentMapper())
.build();
}
然后在每个步骤定义中调用此方法,并为每个步骤传递所需的查询。