Spring 批量线程安全的ItemReader(进程指示器模式)

Spring Batch thread-safe ItemReader (process indicator pattern)

我已经 Remote Chunking 使用 AMQP (RabbitMQ) 实现了。现在我需要 运行 从 Web 容器中并行作业。

我的简单控制器(testJob 使用远程分块):

@Controller
public class JobController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job testJob;

    @RequestMapping("/job/test")
    public void test() {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("date",new Date());
        try {
            jobLauncher.run(personJob,jobParametersBuilder.toJobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        }

    }

}

testJob 从文件系统(主块)读取数据并将其发送到远程块(从块)。问题是 ItemReader 不是线程安全的。

There are some practical limitations of using multi-threaded Steps for some common Batch use cases. Many participants in a Step (e.g. readers and writers) are stateful, and if the state is not segregated by thread, then those components are not usable in a multi-threaded Step. In particular most of the off-the-shelf readers and writers from Spring Batch are not designed for multi-threaded use. It is, however, possible to work with stateless or thread safe readers and writers, and there is a sample (parallelJob) in the Spring Batch Samples that show the use of a process indicator (see Section 6.12, “Preventing State Persistence”) to keep track of items that have been processed in a database input table.

我被考虑在 spring 批处理 github 存储库的 parallelJob 示例中 https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/StagingItemReader.java

我对过程指示器模式有点困惑。在哪里可以找到有关此模式的更多详细信息?

如果您只关心 ItemReader 实例将在作业调用之间共享,您可以将 ItemReader 声明为步骤范围,并且每个实例都会获得一个新实例将消除线程问题的调用。

但是为了回答您关于进程指示器模式的直接问题,我不确定它本身的良好文档在哪里。 Spring 批处理示例中有一个它的实现示例(并行作业使用它)。

其背后的想法是您为要处理的记录提供状态。在 job/step 的开头,您将这些记录标记为处理中。当记录被提交时,您将它们标记为已处理。这消除了在 reader 中跟踪状态的需要,因为您的状态实际上在数据库中(您的查询仅查找标记为处理中的记录)。