Spring Batch中ItemStreamReader的并行步执行

Parallel step execution of ItemStreamReader in SpringBatch

我有一个ItemStreamReaderextends AbstractItemCountingItemStreamItemReader),阅读器本身还是比较快的,但是后面的处理比较费时间。 从业务的角度来看,我可以并行处理任意数量的项目。

由于我的 ItemStreamReader 正在读取一个带有 JsonParser 的大型 JSON 文件,它最终是有状态的。因此,仅向 Step 添加一个 TaskExecutor 是行不通的,并且会抛出解析异常以及 spring 批处理输出以下日志:

16:51:41.023 [main] WARN  o.s.b.c.s.b.FaultTolerantStepBuilder - Asynchronous TaskExecutor detected with ItemStream reader.  This is probably an error, and may lead to incorrect restart data being stored.
16:52:29.790 [jobLauncherTaskExecutor-1] WARN  o.s.b.core.step.item.ChunkMonitor - No ItemReader set (must be concurrent step), so ignoring offset data.
16:52:31.908 [feed-import-1] WARN  o.s.b.core.step.item.ChunkMonitor - ItemStream was opened in a different thread.  Restart data could be compromised.

如何在我的Step中执行多个线程并行执行的处理?

Spring Batch 提供了多种并行处理的方法。在您的情况下,由于处理似乎是瓶颈,我建议您考虑两个选项:

AsyncItemProcessor/AsyncItemWriter
AsyncItemProcessorAsyncItemWriter 协同工作以并行处理块中的项目。您可以将它们视为一种 fork/join 概念。块中的项目由单个线程正常读取。 AsyncItemProcessor 包装您的正常 ItemProcessor 并在不同的线程上执行该逻辑,returning a Future 而不是实际项目。 AsyncItemWriter 然后等待 Future 到 return 处理的项目,然后再写入它。这些 类 可在 Spring 批处理集成模块中找到。您可以在此处的文档中阅读有关它们的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#asynchronous-processors

远程分块
AsyncItemProcessor/AsyncItemWriter 范例在单个 JVM 中运行良好,但如果您需要进一步扩展处理,您可能需要查看远程分块。远程分块旨在将步骤的处理器部分扩展到单个 JVM 之外。使用 master/slave 配置,master 使用常规 ItemReader 读取输入。然后这些项目通过 Spring 集成通道发送到从站进行处理。结果可以写入从机或 returned 到主机进行写入。重要的是要注意,在这种方法中,master 读取的每个项目都将通过网络传输,因此它可能非常 IO 密集型,只有在处理瓶颈比发送消息的潜在影响更糟时才应考虑。您可以在此处的文档中阅读有关远程分块的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#externalizing-batch-process-execution