n条记录提交后如何刷新

How to flush after n records are committed

我在批处理应用程序中配置了两个数据源。一种是从数据库1读取记录,另一种是将记录写入数据库2。

我已经使用springbatch 对记录进行批处理。但是,插入3块记录后,"duplicate key value violates unique constraint"没有重复键。我读过多篇文章,其中建议刷新实体管理器以删除缓存的对象。我仍然对如何使用我当前的应用程序实现这一点感到困惑,并且 spring 批处理在每个块处理后不刷新吗?

@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchJob{

    private final JobBuilderFactory jobBuilderFactory;

    private final StepBuilderFactory stepBuilderFactory;

    private final WriterRepository writerRepository;

    private final ReaderRepository readerRepository;

    private final BatchConfigurer batchConfigurer;

    @Qualifier("readerDatasource")
    private final HikariDataSource readerDatasource;

    private final HikariDataSource writerDatasource;

    private static final String QUERY_FIND_Reader_Person = "SELECT * FROM person where exists='Y'";

    private static final String QUERY_FIND_Reader_Adderess = "SELECT * from address where exists = 'Y'";

    private static final String QUERY_INSERT_Writer_Person = "INSERT INTO person (person_id,name,exists) VALUES(:personId,:name,:exists)";

    private static final String QUERY_INSERT_Writer_Address = "INSERT INTO address (id,person_id,street,exists)VALUES(:id,:personId,:street,:exist)";

    @Bean
    Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .flow(importPersonDetails())
                .next(importAddressDetails())
                .end()
                .build();
    }

    Step importPersonDetails() {
        return this.stepBuilderFactory.get("importPersonDetails")
                .<Person, Person>chunk(500)
                .reader(personItemReader())
                .writer(personWriter())
                .taskExecutor(threadPoolTaskExecutor())
                .listener(new StepListener())
                .build();
    }

    Step importAddressDetails(){
        return this.stepBuilderFactory.get("importAddressDetails")
                .<Address, Address>chunk(500)
                .reader(addressItemReader())
                .writer(addressWriter())
                .taskExecutor(threadPoolTaskExecutor())
                .listener(new StepListener())
                .build();
    }

    JdbcCursorItemReader<Person> personItemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .dataSource(readerDatasource)
                .sql(QUERY_FIND_Reader_Person)
                .rowMapper(new PersonRowMapper())
                .name("PersonItemReader")
                .build();
    }

    JdbcCursorItemReader<Address> AddressItemReader() {
        return new JdbcCursorItemReaderBuilder<Address>()
                .dataSource(readerDatasource)
                .sql(QUERY_FIND_Reader_Adderess)
                .rowMapper(new AddressRowMapper())
                .name("AddressItemReader")
                .build();
    }

    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setThreadnamePrefix("My-Batch-Jobs-TaskExecutor ");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        threadPoolTaskExecutor.initialize();
        log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
        return threadPoolTaskExecutor;
    }

    JdbcBatchItemWriter<Person> PersonWriter() {
        JdbcBatchItemWriter<Person> PersonJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
        PersonJdbcBatchItemWriter.setDataSource(writerDatasource);
        PersonJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
        PersonJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_PersonPerson);
        PersonJdbcBatchItemWriter.afterPropertiesSet();
        return PersonJdbcBatchItemWriter;
    }

    JdbcBatchItemWriter<Address> AddressWriter() {
        JdbcBatchItemWriter<Address> AddressJdbcBatchItemWriter = new JdbcBatchItemWriter<>();
        AddressJdbcBatchItemWriter.setDataSource(writerDatasource);
        AddressJdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Address>());
        AddressJdbcBatchItemWriter.setSql(QUERY_INSERT_Writer_Address);
        AddressJdbcBatchItemWriter.afterPropertiesSet();
        return  AddressJdbcBatchItemWriter;
    }
}

"duplicate key value violates unique constraint" when there is no duplicate key`

如果没有重复,代码将不会抛出这个异常。所以如果你有这个异常,你确实有重复。

I have read multiple posts where it was suggested to flush entity manager to remove the cached objects

根据您的配置,您使用的是 JdbcBatchItemWriter 而不是 JpaItemWriter,所以我不明白为什么我们要在这里讨论刷新实体管理器。 JdbcBatchItemWriter 不使用实体管理器,它使用 jdbc 模板将 insert/update 语句发送到您的数据库,事务完成后将 committed/rolled 返回。

您需要确保没有在后续块中插入重复项。