n条记录提交后如何刷新
How to flush after n records are committed
我在批处理应用程序中配置了两个数据源。一种是从数据库1读取记录,另一种是将记录写入数据库2。
我已经使用springbatch 对记录进行批处理。但是,插入3块记录后,"duplicate key value violates unique constraint"没有重复键。我读过多篇文章,其中建议刷新实体管理器以删除缓存的对象。我仍然对如何使用我当前的应用程序实现这一点感到困惑,并且 spring 批处理在每个块处理后不刷新吗?
@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchJob{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final WriterRepository writerRepository;
private final ReaderRepository readerRepository;
private final BatchConfigurer batchConfigurer;
@Qualifier("readerDatasource")
private final HikariDataSource readerDatasource;
private final HikariDataSource writerDatasource;
private static final String QUERY_FIND_Reader_Person = "SELECT * FROM person where exists='Y'";
private static final String QUERY_FIND_Reader_Adderess = "SELECT * from address where exists = 'Y'";
private static final String QUERY_INSERT_Writer_Person = "INSERT INTO person (person_id,name,exists) VALUES(:personId,:name,:exists)";
private static final String QUERY_INSERT_Writer_Address = "INSERT INTO address (id,person_id,street,exists)VALUES(:id,:personId,:street,:exist)";
@Bean
Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.flow(importPersonDetails())
.next(importAddressDetails())
.end()
.build();
}
Step importPersonDetails() {
return this.stepBuilderFactory.get("importPersonDetails")
.<Person, Person>chunk(500)
.reader(personItemReader())
.writer(personWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
Step importAddressDetails(){
return this.stepBuilderFactory.get("importAddressDetails")
.<Address, Address>chunk(500)
.reader(addressItemReader())
.writer(addressWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
JdbcCursorItemReader<Person> personItemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Person)
.rowMapper(new PersonRowMapper())
.name("PersonItemReader")
.build();
}
JdbcCursorItemReader<Address> AddressItemReader() {
return new JdbcCursorItemReaderBuilder<Address>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Adderess)
.rowMapper(new AddressRowMapper())
.name("AddressItemReader")
.build();
}
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setThreadnamePrefix("My-Batch-Jobs-TaskExecutor ");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
threadPoolTaskExecutor.initialize();
log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
return threadPoolTaskExecutor;
}
JdbcBatchItemWriter<Person> PersonWriter() {
JdbcBatchItemWriter<Person> PersonJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
PersonJdbcBatchItemWriter.setDataSource(writerDatasource);
PersonJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
PersonJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_PersonPerson);
PersonJdbcBatchItemWriter.afterPropertiesSet();
return PersonJdbcBatchItemWriter;
}
JdbcBatchItemWriter<Address> AddressWriter() {
JdbcBatchItemWriter<Address> AddressJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
AddressJdbcBatchItemWriter.setDataSource(writerDatasource);
AddressJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Address>());
AddressJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_Address);
AddressJdbcBatchItemWriter.afterPropertiesSet();
return AddressJdbcBatchItemWriter;
}
}
"duplicate key value violates unique constraint" when there is no duplicate key`
如果没有重复,代码将不会抛出这个异常。所以如果你有这个异常,你确实有重复。
I have read multiple posts where it was suggested to flush entity manager to remove the cached objects
根据您的配置,您使用的是 JdbcBatchItemWriter
而不是 JpaItemWriter
,所以我不明白为什么我们要在这里讨论刷新实体管理器。 JdbcBatchItemWriter
不使用实体管理器,它使用 jdbc 模板将 insert/update 语句发送到您的数据库,事务完成后将 committed/rolled 返回。
您需要确保没有在后续块中插入重复项。
我在批处理应用程序中配置了两个数据源。一种是从数据库1读取记录,另一种是将记录写入数据库2。
我已经使用springbatch 对记录进行批处理。但是,插入3块记录后,"duplicate key value violates unique constraint"没有重复键。我读过多篇文章,其中建议刷新实体管理器以删除缓存的对象。我仍然对如何使用我当前的应用程序实现这一点感到困惑,并且 spring 批处理在每个块处理后不刷新吗?
@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchJob{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final WriterRepository writerRepository;
private final ReaderRepository readerRepository;
private final BatchConfigurer batchConfigurer;
@Qualifier("readerDatasource")
private final HikariDataSource readerDatasource;
private final HikariDataSource writerDatasource;
private static final String QUERY_FIND_Reader_Person = "SELECT * FROM person where exists='Y'";
private static final String QUERY_FIND_Reader_Adderess = "SELECT * from address where exists = 'Y'";
private static final String QUERY_INSERT_Writer_Person = "INSERT INTO person (person_id,name,exists) VALUES(:personId,:name,:exists)";
private static final String QUERY_INSERT_Writer_Address = "INSERT INTO address (id,person_id,street,exists)VALUES(:id,:personId,:street,:exist)";
@Bean
Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.flow(importPersonDetails())
.next(importAddressDetails())
.end()
.build();
}
Step importPersonDetails() {
return this.stepBuilderFactory.get("importPersonDetails")
.<Person, Person>chunk(500)
.reader(personItemReader())
.writer(personWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
Step importAddressDetails(){
return this.stepBuilderFactory.get("importAddressDetails")
.<Address, Address>chunk(500)
.reader(addressItemReader())
.writer(addressWriter())
.taskExecutor(threadPoolTaskExecutor())
.listener(new StepListener())
.build();
}
JdbcCursorItemReader<Person> personItemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Person)
.rowMapper(new PersonRowMapper())
.name("PersonItemReader")
.build();
}
JdbcCursorItemReader<Address> AddressItemReader() {
return new JdbcCursorItemReaderBuilder<Address>()
.dataSource(readerDatasource)
.sql(QUERY_FIND_Reader_Adderess)
.rowMapper(new AddressRowMapper())
.name("AddressItemReader")
.build();
}
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setThreadnamePrefix("My-Batch-Jobs-TaskExecutor ");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
threadPoolTaskExecutor.initialize();
log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
return threadPoolTaskExecutor;
}
JdbcBatchItemWriter<Person> PersonWriter() {
JdbcBatchItemWriter<Person> PersonJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
PersonJdbcBatchItemWriter.setDataSource(writerDatasource);
PersonJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
PersonJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_PersonPerson);
PersonJdbcBatchItemWriter.afterPropertiesSet();
return PersonJdbcBatchItemWriter;
}
JdbcBatchItemWriter<Address> AddressWriter() {
JdbcBatchItemWriter<Address> AddressJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
AddressJdbcBatchItemWriter.setDataSource(writerDatasource);
AddressJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Address>());
AddressJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_Address);
AddressJdbcBatchItemWriter.afterPropertiesSet();
return AddressJdbcBatchItemWriter;
}
}
"duplicate key value violates unique constraint" when there is no duplicate key`
如果没有重复,代码将不会抛出这个异常。所以如果你有这个异常,你确实有重复。
I have read multiple posts where it was suggested to flush entity manager to remove the cached objects
根据您的配置,您使用的是 JdbcBatchItemWriter
而不是 JpaItemWriter
,所以我不明白为什么我们要在这里讨论刷新实体管理器。 JdbcBatchItemWriter
不使用实体管理器,它使用 jdbc 模板将 insert/update 语句发送到您的数据库,事务完成后将 committed/rolled 返回。
您需要确保没有在后续块中插入重复项。