Spring 批处理 - 在从数据库读取下一页之前数据未完全处理
Spring batch - data is not processed completely before reading next page from DB
我有一个 table MY_TABLE
,其中包含大约 500k 行,reader 应该从这个 table 中读取 pageSize
作为 100,然后它应该作为我的处理器处理这个页面并按照我的作者写入数据库。
发生的事情是,reader 继续阅读 100,100 页直到读完,并给出 OutOfMemoryError
大约 400k 条记录。
我不确定为什么这在页面和 chunk
中根本不起作用,以及为什么它从 table 读取所有数据而不是仅使用 pageSize
数量.
下面是我为 step
和 reader
编写的代码。
public Step customStep(StepBuilderFactory stepBuilderFactory, ItemReader<CustomEntityClass> reader,
ItemProcessor<CustomEntityClass, List<CustomEntityClassSecond>> processor, ItemWriter<List<CustomEntityClassSecond>> writer)
{
DefaultResultCompletionPolicy comp = new DefaultResultCompletionPolicy();
return stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<CustomEntityClass, List<CustomEntityClassSecond>> chunk(comp)
.reader(reader)
.processor(processor)
.writer(writer).build();
}
@Bean
public Step purgeTasklet(StepBuilderFactory stepBuilderFactory, Tasklet tasklet)
{
return stepBuilderFactory.get(BatchConstants.TASKLET_NAME).tasklet(tasklet).build();
}
@Bean
public ItemReader<CustomEntityClass> customReader(@Qualifier(DATA_SOURCE) DataSource dataSource)
throws Exception
{
JdbcPagingItemReader<CustomEntityClass> reader = new JdbcPagingItemReader<>();
SqlPagingQueryProviderFactoryBean pagingQueryFactoryBean = new SqlPagingQueryProviderFactoryBean();
pagingQueryFactoryBean.setSelectClause("*");
pagingQueryFactoryBean.setFromClause("MY_TABLE");
pagingQueryFactoryBean.setWhereClause("CREATIONDATE <= :creationDate");
pagingQueryFactoryBean.setSortKey("id");
pagingQueryFactoryBean.setDataSource(dataSource);
reader.setQueryProvider(pagingQueryFactoryBean.getObject());
reader.setPageSize(100);
reader.setDataSource(dataSource);
reader.setRowMapper(new BeanPropertyRowMapper<>(CustomEntityClass.class));
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -1);
DateFormat df = new SimpleDateFormat(BatchConstants.CREATION_DATE_FORMAT);
String creationDate = df.format(calendar.getTime());
Map<String, Object> parameters = new HashMap<>();
parameters.put("creationDate", creationDate);
reader.setParameterValues(parameters);
return reader;
}```
Spring 批处理使用的块大小与 reader 中的页面大小无关。一致地设置它们是有意义的,但这并不是严格要求的。
使用
它应该能按预期工作
stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<CustomEntityClass, List<CustomEntityClassSecond>>chunk(100)
.reader(reader)
.processor(processor)
.writer(writer).build();
然后Spring批处理将使用每块 100 个项目,这恰好是一个查询页面的内容。
我有一个 table MY_TABLE
,其中包含大约 500k 行,reader 应该从这个 table 中读取 pageSize
作为 100,然后它应该作为我的处理器处理这个页面并按照我的作者写入数据库。
发生的事情是,reader 继续阅读 100,100 页直到读完,并给出 OutOfMemoryError
大约 400k 条记录。
我不确定为什么这在页面和 chunk
中根本不起作用,以及为什么它从 table 读取所有数据而不是仅使用 pageSize
数量.
下面是我为 step
和 reader
编写的代码。
public Step customStep(StepBuilderFactory stepBuilderFactory, ItemReader<CustomEntityClass> reader,
ItemProcessor<CustomEntityClass, List<CustomEntityClassSecond>> processor, ItemWriter<List<CustomEntityClassSecond>> writer)
{
DefaultResultCompletionPolicy comp = new DefaultResultCompletionPolicy();
return stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<CustomEntityClass, List<CustomEntityClassSecond>> chunk(comp)
.reader(reader)
.processor(processor)
.writer(writer).build();
}
@Bean
public Step purgeTasklet(StepBuilderFactory stepBuilderFactory, Tasklet tasklet)
{
return stepBuilderFactory.get(BatchConstants.TASKLET_NAME).tasklet(tasklet).build();
}
@Bean
public ItemReader<CustomEntityClass> customReader(@Qualifier(DATA_SOURCE) DataSource dataSource)
throws Exception
{
JdbcPagingItemReader<CustomEntityClass> reader = new JdbcPagingItemReader<>();
SqlPagingQueryProviderFactoryBean pagingQueryFactoryBean = new SqlPagingQueryProviderFactoryBean();
pagingQueryFactoryBean.setSelectClause("*");
pagingQueryFactoryBean.setFromClause("MY_TABLE");
pagingQueryFactoryBean.setWhereClause("CREATIONDATE <= :creationDate");
pagingQueryFactoryBean.setSortKey("id");
pagingQueryFactoryBean.setDataSource(dataSource);
reader.setQueryProvider(pagingQueryFactoryBean.getObject());
reader.setPageSize(100);
reader.setDataSource(dataSource);
reader.setRowMapper(new BeanPropertyRowMapper<>(CustomEntityClass.class));
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -1);
DateFormat df = new SimpleDateFormat(BatchConstants.CREATION_DATE_FORMAT);
String creationDate = df.format(calendar.getTime());
Map<String, Object> parameters = new HashMap<>();
parameters.put("creationDate", creationDate);
reader.setParameterValues(parameters);
return reader;
}```
Spring 批处理使用的块大小与 reader 中的页面大小无关。一致地设置它们是有意义的,但这并不是严格要求的。
使用
它应该能按预期工作stepBuilderFactory.get(BatchConstants.STEP_NAME)
.<CustomEntityClass, List<CustomEntityClassSecond>>chunk(100)
.reader(reader)
.processor(processor)
.writer(writer).build();
然后Spring批处理将使用每块 100 个项目,这恰好是一个查询页面的内容。