为什么我的 Spring 批处理多线程步骤在任何处理之前执行所有读取?
Why is my Spring Batch multi-threaded step executing all reads before any processing?
我正在尝试编写一个 Spring 批处理过程,用于将具有庞大架构的旧数据库中的数百万个条目转换为简化的 JSON 格式并发布 JSON 到 GCP PubSub。为了使这个过程尽可能高效,我正在尝试利用 Spring-Batch 多线程步骤。
为了测试我的进程,我从小开始,页面大小和块大小为 5,要处理的条目总数限制为 20 个,线程池只有 1 个线程。我正在尝试逐步完成该过程以验证它是否按我预期的那样工作 - 但事实并非如此。
我预计将我的 RepositoryItemReader 配置为页面大小为 5,会导致它仅从数据库中读取 5 条记录 - 然后在读取下 5 条记录之前以 5 条记录的形式处理这些记录。但事实并非如此发生了什么。相反,在日志中,因为我启用了 hibernate show-sql,所以我可以看到 reader 在任何处理开始之前读取了所有 20 条记录。
为什么我的多线程步骤在执行任何处理之前执行所有读取?我配置错误了吗?显然,我不希望我的作业在开始处理任何内容之前尝试将数百万个 DTO 加载到内存中...
这是我配置作业的方式:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize, //pageSize and chunkSize both 5 for now
@Value("#{jobParameters[limit]}") Integer limit) { //limit is 40
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
myDomainRepositoryReader.setSaveState(false);
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) {
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
ItemProcessor<DbProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
您需要检查 hibernate.jdbc.fetch_size
的值并相应地进行设置。
pageSize
和fetchSize
是不同的参数。您可以在此处找到有关差异的更多详细信息:。因此,在您的情况下,如果 fetchSize
大于 pageSize
,则获取的记录可能多于页面大小。
我正在尝试编写一个 Spring 批处理过程,用于将具有庞大架构的旧数据库中的数百万个条目转换为简化的 JSON 格式并发布 JSON 到 GCP PubSub。为了使这个过程尽可能高效,我正在尝试利用 Spring-Batch 多线程步骤。
为了测试我的进程,我从小开始,页面大小和块大小为 5,要处理的条目总数限制为 20 个,线程池只有 1 个线程。我正在尝试逐步完成该过程以验证它是否按我预期的那样工作 - 但事实并非如此。
我预计将我的 RepositoryItemReader 配置为页面大小为 5,会导致它仅从数据库中读取 5 条记录 - 然后在读取下 5 条记录之前以 5 条记录的形式处理这些记录。但事实并非如此发生了什么。相反,在日志中,因为我启用了 hibernate show-sql,所以我可以看到 reader 在任何处理开始之前读取了所有 20 条记录。
为什么我的多线程步骤在执行任何处理之前执行所有读取?我配置错误了吗?显然,我不希望我的作业在开始处理任何内容之前尝试将数百万个 DTO 加载到内存中...
这是我配置作业的方式:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize, //pageSize and chunkSize both 5 for now
@Value("#{jobParameters[limit]}") Integer limit) { //limit is 40
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
myDomainRepositoryReader.setSaveState(false);
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) {
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
ItemProcessor<DbProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
您需要检查 hibernate.jdbc.fetch_size
的值并相应地进行设置。
pageSize
和fetchSize
是不同的参数。您可以在此处找到有关差异的更多详细信息:。因此,在您的情况下,如果 fetchSize
大于 pageSize
,则获取的记录可能多于页面大小。