Spring Batch中ItemStreamReader的并行步执行
Parallel step execution of ItemStreamReader in SpringBatch
我有一个ItemStreamReader
(extends AbstractItemCountingItemStreamItemReader
),阅读器本身还是比较快的,但是后面的处理比较费时间。
从业务的角度来看,我可以并行处理任意数量的项目。
由于我的 ItemStreamReader
正在读取一个带有 JsonParser 的大型 JSON 文件,它最终是有状态的。因此,仅向 Step
添加一个 TaskExecutor
是行不通的,并且会抛出解析异常以及 spring 批处理输出以下日志:
16:51:41.023 [main] WARN o.s.b.c.s.b.FaultTolerantStepBuilder - Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, and may lead to incorrect restart data being stored.
16:52:29.790 [jobLauncherTaskExecutor-1] WARN o.s.b.core.step.item.ChunkMonitor - No ItemReader set (must be concurrent step), so ignoring offset data.
16:52:31.908 [feed-import-1] WARN o.s.b.core.step.item.ChunkMonitor - ItemStream was opened in a different thread. Restart data could be compromised.
如何在我的Step中执行多个线程并行执行的处理?
Spring Batch 提供了多种并行处理的方法。在您的情况下,由于处理似乎是瓶颈,我建议您考虑两个选项:
AsyncItemProcessor/AsyncItemWriter
AsyncItemProcessor
和 AsyncItemWriter
协同工作以并行处理块中的项目。您可以将它们视为一种 fork/join 概念。块中的项目由单个线程正常读取。 AsyncItemProcessor
包装您的正常 ItemProcessor
并在不同的线程上执行该逻辑,returning a Future
而不是实际项目。 AsyncItemWriter
然后等待 Future
到 return 处理的项目,然后再写入它。这些 类 可在 Spring 批处理集成模块中找到。您可以在此处的文档中阅读有关它们的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#asynchronous-processors
远程分块
AsyncItemProcessor
/AsyncItemWriter
范例在单个 JVM 中运行良好,但如果您需要进一步扩展处理,您可能需要查看远程分块。远程分块旨在将步骤的处理器部分扩展到单个 JVM 之外。使用 master/slave 配置,master 使用常规 ItemReader
读取输入。然后这些项目通过 Spring 集成通道发送到从站进行处理。结果可以写入从机或 returned 到主机进行写入。重要的是要注意,在这种方法中,master 读取的每个项目都将通过网络传输,因此它可能非常 IO 密集型,只有在处理瓶颈比发送消息的潜在影响更糟时才应考虑。您可以在此处的文档中阅读有关远程分块的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#externalizing-batch-process-execution
我有一个ItemStreamReader
(extends AbstractItemCountingItemStreamItemReader
),阅读器本身还是比较快的,但是后面的处理比较费时间。
从业务的角度来看,我可以并行处理任意数量的项目。
由于我的 ItemStreamReader
正在读取一个带有 JsonParser 的大型 JSON 文件,它最终是有状态的。因此,仅向 Step
添加一个 TaskExecutor
是行不通的,并且会抛出解析异常以及 spring 批处理输出以下日志:
16:51:41.023 [main] WARN o.s.b.c.s.b.FaultTolerantStepBuilder - Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, and may lead to incorrect restart data being stored.
16:52:29.790 [jobLauncherTaskExecutor-1] WARN o.s.b.core.step.item.ChunkMonitor - No ItemReader set (must be concurrent step), so ignoring offset data.
16:52:31.908 [feed-import-1] WARN o.s.b.core.step.item.ChunkMonitor - ItemStream was opened in a different thread. Restart data could be compromised.
如何在我的Step中执行多个线程并行执行的处理?
Spring Batch 提供了多种并行处理的方法。在您的情况下,由于处理似乎是瓶颈,我建议您考虑两个选项:
AsyncItemProcessor/AsyncItemWriter
AsyncItemProcessor
和 AsyncItemWriter
协同工作以并行处理块中的项目。您可以将它们视为一种 fork/join 概念。块中的项目由单个线程正常读取。 AsyncItemProcessor
包装您的正常 ItemProcessor
并在不同的线程上执行该逻辑,returning a Future
而不是实际项目。 AsyncItemWriter
然后等待 Future
到 return 处理的项目,然后再写入它。这些 类 可在 Spring 批处理集成模块中找到。您可以在此处的文档中阅读有关它们的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#asynchronous-processors
远程分块
AsyncItemProcessor
/AsyncItemWriter
范例在单个 JVM 中运行良好,但如果您需要进一步扩展处理,您可能需要查看远程分块。远程分块旨在将步骤的处理器部分扩展到单个 JVM 之外。使用 master/slave 配置,master 使用常规 ItemReader
读取输入。然后这些项目通过 Spring 集成通道发送到从站进行处理。结果可以写入从机或 returned 到主机进行写入。重要的是要注意,在这种方法中,master 读取的每个项目都将通过网络传输,因此它可能非常 IO 密集型,只有在处理瓶颈比发送消息的潜在影响更糟时才应考虑。您可以在此处的文档中阅读有关远程分块的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#externalizing-batch-process-execution