使用 JpaPagingItemReader 时无法根据不同的 pageSize 和 ChunkSize 获取所有记录
Unable to get all Records based on different pageSize and ChunkSize while using JpaPagingItemReader
我需要确定 14 条记录的范围。
块大小为 10
页面大小为 2。
它仅限定 10 条记录。
我用不同的方式检查过。
块大小 = 5
页面大小 = 10
仍然只限定 10 条记录,而不是全部 14 条记录。
仅当 chunksize =11 且 pageSize =10 或 chunkSize = 10 且 pageSize = 20 时才能正常工作
build.gradle
partition:
defaultPartitionSize: 5
partitionScopeChunkSize: 10
jobs:
jpaPagingSize: 2
===================ReaderClass============================
public class PagingItemReader extends
JpaPagingItemReader<ScopeParams> {
public PagingItemReader (
EntityManager entityManager,
EntityManagerFactory entityManagerFactory,
@Value("${spring.jobs.jpaPagingSize}") int jpaPagingSize)
Map<String, Object> parameterValues = new HashMap<>();
this.setQueryProvider(
ScopeParamsQueryProvider.buildForContinuousMatchScoping(
entityManager,
IndustryCodes.valueFromCode(industryCd)));
this.setEntityManagerFactory(entityManagerFactory);
this.setPageSize(jpaPagingSize);
this.setSaveState(true);
this.setParameterValues(parameterValues);
}
}
==============WriterClass==========
public class JpaItemWriter<T> extends JpaItemWriter<T> {
private JpaRepository<T, ? extends Serializable> repository;
public JpaItemWriter(JpaRepository<T, ?> repository) {
this.repository = repository;
}
@Override
@Transactional
public void write(List<? extends T> items) {
persistEntities(items);
}
private void persistEntities(List<? extends T> list) {
list.stream()
.peek(item -> log.info("Writing={}", item))
.forEach(repository::save);
}
}
===================Step Configuration========
public Step WorkStep(StepBuilderFactory stepBuilderFactory,
PagingItemReader ItemReader,
ItemProcessor ItemProcessor,
JpaItemWriter<Scope> itemWriter) {
return stepBuilderFactory.get(WORK_MATCH)
.<Scope, ExecutionScope>chunk(10)
.reader(ItemReader)
.processor(ItemProcessor)
.writer(itemWriter)
.build();
}
处理器代码,
public class MatchItemProcessor implements ItemProcessor<Scope,ExecutionScope> {
public ExecutionScope process(Scope financialTransaction) throws Exception {
return batchExecutionScope;
}
}
private ExecutionScope prepareData(Scope transaction) { ExecutionScope executionScope = new ExecutionScope(); executionScope .setIndustryTypeCode(financialTransaction.getIndustryTypeCode()); return executionScope ; }
我正在使用发生读取的相同字段更新处理器中的其他对象。所以我正在阅读 reader class 中的 "Scope" 实体。在处理器中 class 创建 execitionScope 对象并根据范围更新值并在数据库中持久化 execitionScope。
两个实体都指向不同的 table。 ScopeParam
命中 fin_t
table 和 ExecutionScope
命中 exec_scope table.
请给我建议。
问题已解决。
我在这方面得到了帮助 link。
Spring batch jpaPagingItemReader why some rows are not read?
实际问题
JPAPagingItemReader 使用偏移量和限制,如果您的范围查询输出作为 writer/chunking 的一部分被修改,那么下一页将已经有一个修改过的数据集,并且偏移量将继续跳过未处理的数据。
由于我们的范围查询忽略了已经作为任何活动批次一部分的范围的事务,因此一旦第一个分页集被丢弃,它们就会被遗漏。
解决方法
修改了我的范围查询并忽略当前 运行 作业。
我需要确定 14 条记录的范围。 块大小为 10 页面大小为 2。 它仅限定 10 条记录。
我用不同的方式检查过。 块大小 = 5 页面大小 = 10 仍然只限定 10 条记录,而不是全部 14 条记录。
仅当 chunksize =11 且 pageSize =10 或 chunkSize = 10 且 pageSize = 20 时才能正常工作
build.gradle
partition:
defaultPartitionSize: 5
partitionScopeChunkSize: 10
jobs:
jpaPagingSize: 2
===================ReaderClass============================
public class PagingItemReader extends
JpaPagingItemReader<ScopeParams> {
public PagingItemReader (
EntityManager entityManager,
EntityManagerFactory entityManagerFactory,
@Value("${spring.jobs.jpaPagingSize}") int jpaPagingSize)
Map<String, Object> parameterValues = new HashMap<>();
this.setQueryProvider(
ScopeParamsQueryProvider.buildForContinuousMatchScoping(
entityManager,
IndustryCodes.valueFromCode(industryCd)));
this.setEntityManagerFactory(entityManagerFactory);
this.setPageSize(jpaPagingSize);
this.setSaveState(true);
this.setParameterValues(parameterValues);
}
}
==============WriterClass==========
public class JpaItemWriter<T> extends JpaItemWriter<T> {
private JpaRepository<T, ? extends Serializable> repository;
public JpaItemWriter(JpaRepository<T, ?> repository) {
this.repository = repository;
}
@Override
@Transactional
public void write(List<? extends T> items) {
persistEntities(items);
}
private void persistEntities(List<? extends T> list) {
list.stream()
.peek(item -> log.info("Writing={}", item))
.forEach(repository::save);
}
}
===================Step Configuration========
public Step WorkStep(StepBuilderFactory stepBuilderFactory,
PagingItemReader ItemReader,
ItemProcessor ItemProcessor,
JpaItemWriter<Scope> itemWriter) {
return stepBuilderFactory.get(WORK_MATCH)
.<Scope, ExecutionScope>chunk(10)
.reader(ItemReader)
.processor(ItemProcessor)
.writer(itemWriter)
.build();
}
处理器代码,
public class MatchItemProcessor implements ItemProcessor<Scope,ExecutionScope> {
public ExecutionScope process(Scope financialTransaction) throws Exception {
return batchExecutionScope;
}
}
private ExecutionScope prepareData(Scope transaction) { ExecutionScope executionScope = new ExecutionScope(); executionScope .setIndustryTypeCode(financialTransaction.getIndustryTypeCode()); return executionScope ; }
我正在使用发生读取的相同字段更新处理器中的其他对象。所以我正在阅读 reader class 中的 "Scope" 实体。在处理器中 class 创建 execitionScope 对象并根据范围更新值并在数据库中持久化 execitionScope。
两个实体都指向不同的 table。 ScopeParam
命中 fin_t
table 和 ExecutionScope
命中 exec_scope table.
请给我建议。
问题已解决。 我在这方面得到了帮助 link。 Spring batch jpaPagingItemReader why some rows are not read?
实际问题
JPAPagingItemReader 使用偏移量和限制,如果您的范围查询输出作为 writer/chunking 的一部分被修改,那么下一页将已经有一个修改过的数据集,并且偏移量将继续跳过未处理的数据。 由于我们的范围查询忽略了已经作为任何活动批次一部分的范围的事务,因此一旦第一个分页集被丢弃,它们就会被遗漏。
解决方法 修改了我的范围查询并忽略当前 运行 作业。