Spring 批次 Reader 正在读取备用记录
Spring Batch Reader is reading alternate records
我创建了一个示例 spring 批处理应用程序,它试图从数据库和写入器中读取记录,它显示这些记录。但是,我可以看到只打印了偶数(备用)记录。
这不是数据库的问题,因为该行为与 H2 数据库或 Oracle 数据库一致。
我的数据库中共有 100 条记录。
使用 JDBCCursorItemReader,只读取了 50 条记录,而且从日志快照中可以看出,这也是交替的一条记录
使用JdbcPagingItemReader,只读取了5条记录,而且从日志快照中可以看出,那条记录太交替了
我的代码配置如下。 为什么 reader 跳过奇数记录?
@Bean
public ItemWriter<Safety> safetyWriter() {
return items -> {
for (Safety item : items) {
log.info(item.toString());
}
};
}
@Bean
public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();
reader.setSql("select * from safety " );
reader.setDataSource(dataSource);
reader.setRowMapper(new SafetyRowMapper());
reader.setVerifyCursorPosition(false);
reader.afterPropertiesSet();
return reader;
}
@Bean
JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(10);
reader.setRowMapper(new SafetyRowMapper());
H2PagingQueryProvider queryProvider = new H2PagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("safety");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public Step importSafetyDetails() throws Exception {
return stepBuilderFactory.get("importSafetyDetails")
.<Safety, Safety>chunk(chunkSize)
//.reader(cursorItemReader())
.reader(safetyPagingItemReader())
.writer(safetyWriter())
.listener(new StepListener())
.listener(new ChunkListener())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(importSafetyDetails())
.build();
}
域 类 如下所示:
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Safety {
private int id;
}
public class SafetyRowMapper implements RowMapper<Safety> {
@Override
public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
if(resultSet.next()) {
Safety safety = new Safety();
safety.setId(resultSet.getInt("id"));
return safety;
}
return null;
}
}
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchSamplesApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchSamplesApplication.class, args);
}
}
application.yml配置如下:
spring:
application:
name: spring-batch-samples
main:
allow-bean-definition-overriding: true
datasource:
url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password:
driver-class-name: org.h2.Driver
hikari:
connection-timeout: 20000
maximum-pool-size: 10
h2:
console:
enabled: true
batch:
initialize-schema: never
server:
port: 9090
sql如下:
CREATE TABLE safety (
id int NOT NULL,
CONSTRAINT PK_ID PRIMARY KEY (id)
);
INSERT INTO safety (id) VALUES (1);
...100 records are inserted
听众类如下:
@Slf4j
public class StepListener{
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("In step {} ,Exit Status: {} ,Read Records: {} ,Committed Records: {} ,Skipped Read Records: {} ,Skipped Write Records: {}",
stepExecution.getStepName(),
stepExecution.getExitStatus().getExitCode(),
stepExecution.getReadCount(),
stepExecution.getCommitCount(),
stepExecution.getReadSkipCount(),
stepExecution.getWriteSkipCount());
return stepExecution.getExitStatus();
}
}
@Slf4j
public class ChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context) {
log.info("<< Before the chunk");
}
@AfterChunk
public void afterChunk(ChunkContext context) {
log.info("<< After the chunk");
}
}
我试图重现你的问题,但我不能。如果您能分享更多代码就更好了。
与此同时,我创建了一个简单的作业来从 "safety" table 中读取 100 条记录并将它们打印到控制台。 工作正常。
.
@SpringBootApplication
@EnableBatchProcessing
public class ReaderWriterProblem implements CommandLineRunner {
@Autowired
DataSource dataSource;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
public static void main(String[] args) {
String[] arguments = new String[]{LocalDateTime.now().toString()};
SpringApplication.run(ReaderWriterProblem.class, arguments);
}
@Bean
public ItemWriter<Safety> safetyWriter() {
return new ItemWriter<Safety>() {
@Override
public void write(List<? extends Safety> items) throws Exception {
for (Safety item : items) {
//log.info(item.toString());
System.out.println(item);
}
}
};
}
// @Bean
// public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
// JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();
//
// reader.setSql("select * from safety ");
// reader.setDataSource(dataSource);
// reader.setRowMapper(new SafetyRowMapper());
// reader.setVerifyCursorPosition(false);
// reader.afterPropertiesSet();
//
// return reader;
// }
@Bean
JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(10);
reader.setRowMapper(new SafetyRowMapper());
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("safety");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public Step importSafetyDetails() throws Exception {
return stepBuilderFactory.get("importSafetyDetails")
.<Safety, Safety>chunk(5)
//.reader(cursorItemReader())
.reader(safetyPagingItemReader())
.writer(safetyWriter())
.listener(new MyStepListener())
.listener(new MyChunkListener())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.listener(new JobListener())
.start(importSafetyDetails())
.build();
}
@Override
public void run(String... args) throws Exception {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("date", LocalDateTime.now().toString());
try {
Job job = (Job) context.getBean("job");
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
e.printStackTrace();
}
}
public static class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("Before job");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("After job");
}
}
private static class SafetyRowMapper implements RowMapper<Safety> {
@Override
public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
Safety safety = new Safety();
safety.setId(resultSet.getLong("ID"));
return safety;
}
}
public static class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Before Step");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("After Step");
return ExitStatus.COMPLETED;
}
}
private static class MyChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("Before Chunk");
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("After Chunk");
}
@Override
public void afterChunkError(ChunkContext context) {
}
}
}
希望对您有所帮助
我创建了一个示例 spring 批处理应用程序,它试图从数据库和写入器中读取记录,它显示这些记录。但是,我可以看到只打印了偶数(备用)记录。
这不是数据库的问题,因为该行为与 H2 数据库或 Oracle 数据库一致。
我的数据库中共有 100 条记录。
使用 JDBCCursorItemReader,只读取了 50 条记录,而且从日志快照中可以看出,这也是交替的一条记录
使用JdbcPagingItemReader,只读取了5条记录,而且从日志快照中可以看出,那条记录太交替了
我的代码配置如下。 为什么 reader 跳过奇数记录?
@Bean
public ItemWriter<Safety> safetyWriter() {
return items -> {
for (Safety item : items) {
log.info(item.toString());
}
};
}
@Bean
public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();
reader.setSql("select * from safety " );
reader.setDataSource(dataSource);
reader.setRowMapper(new SafetyRowMapper());
reader.setVerifyCursorPosition(false);
reader.afterPropertiesSet();
return reader;
}
@Bean
JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(10);
reader.setRowMapper(new SafetyRowMapper());
H2PagingQueryProvider queryProvider = new H2PagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("safety");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public Step importSafetyDetails() throws Exception {
return stepBuilderFactory.get("importSafetyDetails")
.<Safety, Safety>chunk(chunkSize)
//.reader(cursorItemReader())
.reader(safetyPagingItemReader())
.writer(safetyWriter())
.listener(new StepListener())
.listener(new ChunkListener())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(importSafetyDetails())
.build();
}
域 类 如下所示:
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Safety {
private int id;
}
public class SafetyRowMapper implements RowMapper<Safety> {
@Override
public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
if(resultSet.next()) {
Safety safety = new Safety();
safety.setId(resultSet.getInt("id"));
return safety;
}
return null;
}
}
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchSamplesApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchSamplesApplication.class, args);
}
}
application.yml配置如下:
spring:
application:
name: spring-batch-samples
main:
allow-bean-definition-overriding: true
datasource:
url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password:
driver-class-name: org.h2.Driver
hikari:
connection-timeout: 20000
maximum-pool-size: 10
h2:
console:
enabled: true
batch:
initialize-schema: never
server:
port: 9090
sql如下:
CREATE TABLE safety (
id int NOT NULL,
CONSTRAINT PK_ID PRIMARY KEY (id)
);
INSERT INTO safety (id) VALUES (1);
...100 records are inserted
听众类如下:
@Slf4j
public class StepListener{
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("In step {} ,Exit Status: {} ,Read Records: {} ,Committed Records: {} ,Skipped Read Records: {} ,Skipped Write Records: {}",
stepExecution.getStepName(),
stepExecution.getExitStatus().getExitCode(),
stepExecution.getReadCount(),
stepExecution.getCommitCount(),
stepExecution.getReadSkipCount(),
stepExecution.getWriteSkipCount());
return stepExecution.getExitStatus();
}
}
@Slf4j
public class ChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context) {
log.info("<< Before the chunk");
}
@AfterChunk
public void afterChunk(ChunkContext context) {
log.info("<< After the chunk");
}
}
我试图重现你的问题,但我不能。如果您能分享更多代码就更好了。
与此同时,我创建了一个简单的作业来从 "safety" table 中读取 100 条记录并将它们打印到控制台。 工作正常。
@SpringBootApplication
@EnableBatchProcessing
public class ReaderWriterProblem implements CommandLineRunner {
@Autowired
DataSource dataSource;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
public static void main(String[] args) {
String[] arguments = new String[]{LocalDateTime.now().toString()};
SpringApplication.run(ReaderWriterProblem.class, arguments);
}
@Bean
public ItemWriter<Safety> safetyWriter() {
return new ItemWriter<Safety>() {
@Override
public void write(List<? extends Safety> items) throws Exception {
for (Safety item : items) {
//log.info(item.toString());
System.out.println(item);
}
}
};
}
// @Bean
// public JdbcCursorItemReader<Safety> cursorItemReader() throws Exception {
// JdbcCursorItemReader<Safety> reader = new JdbcCursorItemReader<>();
//
// reader.setSql("select * from safety ");
// reader.setDataSource(dataSource);
// reader.setRowMapper(new SafetyRowMapper());
// reader.setVerifyCursorPosition(false);
// reader.afterPropertiesSet();
//
// return reader;
// }
@Bean
JdbcPagingItemReader<Safety> safetyPagingItemReader() throws Exception {
JdbcPagingItemReader<Safety> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(10);
reader.setRowMapper(new SafetyRowMapper());
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("safety");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public Step importSafetyDetails() throws Exception {
return stepBuilderFactory.get("importSafetyDetails")
.<Safety, Safety>chunk(5)
//.reader(cursorItemReader())
.reader(safetyPagingItemReader())
.writer(safetyWriter())
.listener(new MyStepListener())
.listener(new MyChunkListener())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.listener(new JobListener())
.start(importSafetyDetails())
.build();
}
@Override
public void run(String... args) throws Exception {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("date", LocalDateTime.now().toString());
try {
Job job = (Job) context.getBean("job");
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
e.printStackTrace();
}
}
public static class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("Before job");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("After job");
}
}
private static class SafetyRowMapper implements RowMapper<Safety> {
@Override
public Safety mapRow(ResultSet resultSet, int i) throws SQLException {
Safety safety = new Safety();
safety.setId(resultSet.getLong("ID"));
return safety;
}
}
public static class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Before Step");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("After Step");
return ExitStatus.COMPLETED;
}
}
private static class MyChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("Before Chunk");
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("After Chunk");
}
@Override
public void afterChunkError(ChunkContext context) {
}
}
}
希望对您有所帮助