将 2 个拆分定义为 运行 一组并行的步骤
Defining 2 splits to run a set of steps in parallel
我有一个作业配置,我在其中并行加载一组文件,在加载这组文件后,我还想并行加载另一组文件,但只有在第一组文件完全加载后才能加载。第二组具有对第一组的参考字段。我以为我可以使用第二个拆分,但从未成功,在 xsd 中,您似乎可以定义多个拆分,显然流程无法帮助我满足我的要求。
那么我如何定义 2 组并行流程,每个流程 运行 按顺序排列?
<job>
<split>
<flow>
<step next="step2"/>
<step id="step2"/>
</flow>
<flow>
<step ...>
</flow>
</split>
<split ../>
Asoub 是对的,这是完全有可能的,我做了一个简单的配置并且成功了。所以我得到的原始问题似乎还有其他一些问题,这些问题在定义 2 个拆分时会导致问题。
我使用的简单配置:
<batch:job id="batchJob" restartable="true">
<batch:split id="x" next="y">
<batch:flow>
<batch:step id="a">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="b">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<batch:split id="y" next="e">
<batch:flow>
<batch:step id="c">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="d">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<batch:step id="e">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:job>
INFO: Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{random=994444}]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [a]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [b]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [c]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [d]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [e]
Nov 23, 2016 11:33:25 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{random=994444}] and the following status: [COMPLETED]
我会使用两个分区步骤。每个分区程序将负责识别其各自集合中的文件,以供并发子步骤处理
<job>
<step name="loadFirstSet">
<partition partitioner="firstSetPartitioner">
<handler task-executor="asyncTaskExecutor" />
<step name="loadFileFromSetOne>
<tasklet>
<chunk reader="someReader" writer="someWriter" commit-interval="#{jobParameters['commit.interval']}" />
</tasklet>
</step>
</partition>
</step>
<step name="loadSecondSet">
<partition partitioner="secondSetPartitioner">
<handler task-executor="asyncTaskExecutor" />
<step name="loadFileFromSecondSet>
<tasklet>
<chunk reader="someOtherReader" writer="someOtherWriter" commit-interval="#{jobParameters['another.commit.interval']}" />
</tasklet>
</step>
</partition>
</step>
</job>
正如我在评论中所说,"So how do I define 2 sets of parallel flows which run in sequence to each?" 本身没有意义,您不能并行和顺序地开始两步。
我还是觉得你想"start loading file2 in step2 when file1 in step1 as finished loading"。这意味着加载文件发生在步骤的中间。我看到有两种解决方法。
假设这是您的配置:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step3">
<flow>
<step id="step1" parent="s1"/>
</flow>
<flow>
<step id="step2" parent="s2"/>
</flow>
</split>
<step id="step3" parent="s4"/> <!-- not important here -->
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
但这将立即开始您的两个并行步骤。您需要阻止第 2 步的开始。因此,您需要在第 2 步的 reader
中使用 Delegate
,这将立即停止加载 file2,并等待信号开始读取。在 step1 代码的某处,您认为加载已完成,您向 step2 的委托 reader 发出信号以开始加载 file2.
第二个解决方案是:您创建自己的 SimpleAsyncTaskExecutor
,它将启动 step1 并等待来自 step1 的信号以启动 step2。它基本上是第一个解决方案,但您在自定义 Executor
中等待信号,而不是在 Delegate
reader 中。 (您可以从 SimpleAsyncTaskExecutor
复制源代码以获得想法)
这是有代价的,如果 step1 从未到达它向 step2 发出开始加载信号的部分,您的批处理将永远挂起。加载中的异常可能会导致这种情况。至于信号机制,Java 有很多方法可以做到这一点(wait()
和 notifiy()
、锁、信号量、非标准库可能)。
我不认为 spring 批次中有一些并行步骤触发器之王(但如果有,有人会发布它)。
我在问你的问题时已经回答了一些,你需要 2 次拆分:第一个加载文件集 A,第二个加载文件集 B。
<job id="job1">
<split id="splitForSet_A" task-executor="taskExecutor" next="splitForSet_B">
<flow><step id="step1" parent="s1"/></flow>
<flow><step id="step2" parent="s2"/></flow>
<flow><step id="step3" parent="s3"/></flow>
</split>
<split id="splitForSet_B" task-executor="taskExecutor" next="stepWhatever">
<flow><step id="step4" parent="s4"/></flow>
<flow><step id="step5" parent="s5"/></flow>
<flow><step id="step6" parent="s6"/></flow>
</split>
<step id="stepWhatever" parent="sx"/>
</job>
步骤 1、2 和 3 将 运行 并行(并加载文件集 A),然后,一旦它们全部结束,第二个拆分 (splitForSet_B
) 将开始并且 运行 并行执行第 4、5 和 6 步。拆分基本上是一个包含并行步骤 运行 的步骤。
您只需在每个步骤中指定您将使用的文件(因此第一次拆分的步骤与第二次拆分的步骤会有所不同。
我有一个作业配置,我在其中并行加载一组文件,在加载这组文件后,我还想并行加载另一组文件,但只有在第一组文件完全加载后才能加载。第二组具有对第一组的参考字段。我以为我可以使用第二个拆分,但从未成功,在 xsd 中,您似乎可以定义多个拆分,显然流程无法帮助我满足我的要求。 那么我如何定义 2 组并行流程,每个流程 运行 按顺序排列?
<job>
<split>
<flow>
<step next="step2"/>
<step id="step2"/>
</flow>
<flow>
<step ...>
</flow>
</split>
<split ../>
Asoub 是对的,这是完全有可能的,我做了一个简单的配置并且成功了。所以我得到的原始问题似乎还有其他一些问题,这些问题在定义 2 个拆分时会导致问题。
我使用的简单配置:
<batch:job id="batchJob" restartable="true">
<batch:split id="x" next="y">
<batch:flow>
<batch:step id="a">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="b">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<batch:split id="y" next="e">
<batch:flow>
<batch:step id="c">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="d">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<batch:step id="e">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:job>
INFO: Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{random=994444}]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [a]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [b]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [c]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [d]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [e]
Nov 23, 2016 11:33:25 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{random=994444}] and the following status: [COMPLETED]
我会使用两个分区步骤。每个分区程序将负责识别其各自集合中的文件,以供并发子步骤处理
<job>
<step name="loadFirstSet">
<partition partitioner="firstSetPartitioner">
<handler task-executor="asyncTaskExecutor" />
<step name="loadFileFromSetOne>
<tasklet>
<chunk reader="someReader" writer="someWriter" commit-interval="#{jobParameters['commit.interval']}" />
</tasklet>
</step>
</partition>
</step>
<step name="loadSecondSet">
<partition partitioner="secondSetPartitioner">
<handler task-executor="asyncTaskExecutor" />
<step name="loadFileFromSecondSet>
<tasklet>
<chunk reader="someOtherReader" writer="someOtherWriter" commit-interval="#{jobParameters['another.commit.interval']}" />
</tasklet>
</step>
</partition>
</step>
</job>
正如我在评论中所说,"So how do I define 2 sets of parallel flows which run in sequence to each?" 本身没有意义,您不能并行和顺序地开始两步。
我还是觉得你想"start loading file2 in step2 when file1 in step1 as finished loading"。这意味着加载文件发生在步骤的中间。我看到有两种解决方法。
假设这是您的配置:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step3">
<flow>
<step id="step1" parent="s1"/>
</flow>
<flow>
<step id="step2" parent="s2"/>
</flow>
</split>
<step id="step3" parent="s4"/> <!-- not important here -->
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
但这将立即开始您的两个并行步骤。您需要阻止第 2 步的开始。因此,您需要在第 2 步的 reader
中使用 Delegate
,这将立即停止加载 file2,并等待信号开始读取。在 step1 代码的某处,您认为加载已完成,您向 step2 的委托 reader 发出信号以开始加载 file2.
第二个解决方案是:您创建自己的 SimpleAsyncTaskExecutor
,它将启动 step1 并等待来自 step1 的信号以启动 step2。它基本上是第一个解决方案,但您在自定义 Executor
中等待信号,而不是在 Delegate
reader 中。 (您可以从 SimpleAsyncTaskExecutor
复制源代码以获得想法)
这是有代价的,如果 step1 从未到达它向 step2 发出开始加载信号的部分,您的批处理将永远挂起。加载中的异常可能会导致这种情况。至于信号机制,Java 有很多方法可以做到这一点(wait()
和 notifiy()
、锁、信号量、非标准库可能)。
我不认为 spring 批次中有一些并行步骤触发器之王(但如果有,有人会发布它)。
我在问你的问题时已经回答了一些,你需要 2 次拆分:第一个加载文件集 A,第二个加载文件集 B。
<job id="job1">
<split id="splitForSet_A" task-executor="taskExecutor" next="splitForSet_B">
<flow><step id="step1" parent="s1"/></flow>
<flow><step id="step2" parent="s2"/></flow>
<flow><step id="step3" parent="s3"/></flow>
</split>
<split id="splitForSet_B" task-executor="taskExecutor" next="stepWhatever">
<flow><step id="step4" parent="s4"/></flow>
<flow><step id="step5" parent="s5"/></flow>
<flow><step id="step6" parent="s6"/></flow>
</split>
<step id="stepWhatever" parent="sx"/>
</job>
步骤 1、2 和 3 将 运行 并行(并加载文件集 A),然后,一旦它们全部结束,第二个拆分 (splitForSet_B
) 将开始并且 运行 并行执行第 4、5 和 6 步。拆分基本上是一个包含并行步骤 运行 的步骤。
您只需在每个步骤中指定您将使用的文件(因此第一次拆分的步骤与第二次拆分的步骤会有所不同。