为分区的从属步骤并行执行块
Executing chunks in parallel for a partitioned slave step
我的这个问题是对另一个 的延伸。由于这看起来不可能,我正在尝试为并行/分区从属步骤并行执行块。
Article 表示只需将 SimpleAsyncTaskExecutor
指定为步骤的任务执行器即可开始并行执行块。
@Bean
public Step masterLuceneIndexerStep() throws Exception{
return stepBuilderFactory.get("masterLuceneIndexerStep")
.partitioner(slaveLuceneIndexerStep())
.partitioner("slaveLuceneIndexerStep", partitioner())
.gridSize(Constants.PARTITIONER_GRID_SIZE)
.taskExecutor(simpleAsyntaskExecutor)
.build();
}
@Bean
public Step slaveLuceneIndexerStep()throws Exception{
return stepBuilderFactory.get("slaveLuceneIndexerStep")
.<IndexerInputVO,IndexerOutputVO> chunk(Constants.INDEXER_STEP_CHUNK_SIZE)
.reader(luceneIndexReader(null))
.processor(luceneIndexProcessor())
.writer(luceneIndexWriter(null))
.listener(luceneIndexerStepListener)
.listener(lichunkListener)
.throttleLimit(Constants.THROTTLE_LIMIT)
.build();
}
如果我指定 .taskExecutor(simpleAsyntaskExecutor)
作为从属步骤,则作业失败。主步骤中的行 .taskExecutor(simpleAsyntaskExecutor)
工作正常,但块在串行和分区步骤中并行工作。
是否可以并行化 slaveLuceneIndexerStep()
的块?
基本上,每个块都以顺序方式将 Lucene 索引写入单个目录,我想进一步并行化每个目录中的索引写入过程,因为 Lucene IndexWriter
是线程安全的。
我可以通过以下步骤从分区从属步骤中启动并行块,
1.I 首先照顾我的 reader,处理器和编写器是线程安全的,以便这些组件可以参与并行块而不会出现并发问题。
2.I 将主步骤的任务执行器保留为 SimpleAsyntaskExecutor
因为从步骤很长 运行 并且我希望在某个时间点恰好启动 N 个线程。我通过设置任务执行器的concurrencyLimit
来控制N。
3.Then 我设置了一个 ThreadPoolTaskExecutor
作为从属步骤的任务执行器。该池被所有从属步骤用作公共池,因此我将其核心池大小设置为最小 N,以便每个从属步骤至少获得一个线程,并且不会发生饥饿。您可以根据系统容量增加此线程池的大小,我使用了一个线程池,因为块是较小的 运行 个进程。
使用线程池还可以处理我的应用程序的特定情况,即我的分区是由 client_id
进行的,因此当较小的客户端完成时,相同的线程会自动被较大的客户端重用,并且 client_id 会产生不对称性分区得到处理,因为要为每个客户端处理的数据差异很大。
主步任务执行器简单地启动所有从步线程并进入WAITING
状态,而从步块由从步中指定的线程池处理。
我的这个问题是对另一个
Article 表示只需将 SimpleAsyncTaskExecutor
指定为步骤的任务执行器即可开始并行执行块。
@Bean
public Step masterLuceneIndexerStep() throws Exception{
return stepBuilderFactory.get("masterLuceneIndexerStep")
.partitioner(slaveLuceneIndexerStep())
.partitioner("slaveLuceneIndexerStep", partitioner())
.gridSize(Constants.PARTITIONER_GRID_SIZE)
.taskExecutor(simpleAsyntaskExecutor)
.build();
}
@Bean
public Step slaveLuceneIndexerStep()throws Exception{
return stepBuilderFactory.get("slaveLuceneIndexerStep")
.<IndexerInputVO,IndexerOutputVO> chunk(Constants.INDEXER_STEP_CHUNK_SIZE)
.reader(luceneIndexReader(null))
.processor(luceneIndexProcessor())
.writer(luceneIndexWriter(null))
.listener(luceneIndexerStepListener)
.listener(lichunkListener)
.throttleLimit(Constants.THROTTLE_LIMIT)
.build();
}
如果我指定 .taskExecutor(simpleAsyntaskExecutor)
作为从属步骤,则作业失败。主步骤中的行 .taskExecutor(simpleAsyntaskExecutor)
工作正常,但块在串行和分区步骤中并行工作。
是否可以并行化 slaveLuceneIndexerStep()
的块?
基本上,每个块都以顺序方式将 Lucene 索引写入单个目录,我想进一步并行化每个目录中的索引写入过程,因为 Lucene IndexWriter
是线程安全的。
我可以通过以下步骤从分区从属步骤中启动并行块,
1.I 首先照顾我的 reader,处理器和编写器是线程安全的,以便这些组件可以参与并行块而不会出现并发问题。
2.I 将主步骤的任务执行器保留为 SimpleAsyntaskExecutor
因为从步骤很长 运行 并且我希望在某个时间点恰好启动 N 个线程。我通过设置任务执行器的concurrencyLimit
来控制N。
3.Then 我设置了一个 ThreadPoolTaskExecutor
作为从属步骤的任务执行器。该池被所有从属步骤用作公共池,因此我将其核心池大小设置为最小 N,以便每个从属步骤至少获得一个线程,并且不会发生饥饿。您可以根据系统容量增加此线程池的大小,我使用了一个线程池,因为块是较小的 运行 个进程。
使用线程池还可以处理我的应用程序的特定情况,即我的分区是由 client_id
进行的,因此当较小的客户端完成时,相同的线程会自动被较大的客户端重用,并且 client_id 会产生不对称性分区得到处理,因为要为每个客户端处理的数据差异很大。
主步任务执行器简单地启动所有从步线程并进入WAITING
状态,而从步块由从步中指定的线程池处理。