将 2 个拆分定义为 运行 一组并行的步骤

Defining 2 splits to run a set of steps in parallel

我有一个作业配置,我在其中并行加载一组文件,在加载这组文件后,我还想并行加载另一组文件,但只有在第一组文件完全加载后才能加载。第二组具有对第一组的参考字段。我以为我可以使用第二个拆分,但从未成功,在 xsd 中,您似乎可以定义多个拆分,显然流程无法帮助我满足我的要求。 那么我如何定义 2 组并行流程,每个流程 运行 按顺序排列?

<job>
  <split>
    <flow>
      <step next="step2"/>
      <step id="step2"/>
    </flow>
    <flow>
      <step ...>
    </flow>
  </split>
 <split ../>

Asoub 是对的,这是完全有可能的,我做了一个简单的配置并且成功了。所以我得到的原始问题似乎还有其他一些问题,这些问题在定义 2 个拆分时会导致问题。

我使用的简单配置:

<batch:job id="batchJob" restartable="true">
    <batch:split id="x" next="y">
        <batch:flow>
            <batch:step id="a">
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
                </batch:tasklet>
            </batch:step>
        </batch:flow>
        <batch:flow>
            <batch:step id="b">
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
                </batch:tasklet>
            </batch:step>
        </batch:flow>
    </batch:split>
    <batch:split id="y" next="e">
        <batch:flow>
            <batch:step id="c">
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
                </batch:tasklet>
            </batch:step>
        </batch:flow>
        <batch:flow>
            <batch:step id="d">
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
                </batch:tasklet>
            </batch:step>
        </batch:flow>
    </batch:split>
    <batch:step id="e">
        <batch:tasklet allow-start-if-complete="true">
            <batch:chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </batch:tasklet>
    </batch:step>
</batch:job>


INFO: Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{random=994444}]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [a]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [b]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [c]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [d]
Nov 23, 2016 11:33:24 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [e]
Nov 23, 2016 11:33:25 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{random=994444}] and the following status: [COMPLETED]

我会使用两个分区步骤。每个分区程序将负责识别其各自集合中的文件,以供并发子步骤处理

<job>
  <step name="loadFirstSet">
    <partition partitioner="firstSetPartitioner">
      <handler task-executor="asyncTaskExecutor" />
      <step name="loadFileFromSetOne>
        <tasklet>
          <chunk reader="someReader" writer="someWriter" commit-interval="#{jobParameters['commit.interval']}" />
        </tasklet>
      </step>
    </partition>
  </step>
  <step name="loadSecondSet">
    <partition partitioner="secondSetPartitioner">
      <handler task-executor="asyncTaskExecutor" />
      <step name="loadFileFromSecondSet>
        <tasklet>
          <chunk reader="someOtherReader" writer="someOtherWriter" commit-interval="#{jobParameters['another.commit.interval']}" />
        </tasklet>
      </step>
    </partition>
  </step>
</job>

正如我在评论中所说,"So how do I define 2 sets of parallel flows which run in sequence to each?" 本身没有意义,您不能并行和顺序地开始两步。

我还是觉得你想"start loading file2 in step2 when file1 in step1 as finished loading"。这意味着加载文件发生在步骤的中间。我看到有两种解决方法。

假设这是您的配置:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step3">
        <flow>
            <step id="step1" parent="s1"/>
        </flow>
        <flow>
            <step id="step2" parent="s2"/>
        </flow>
    </split>
    <step id="step3" parent="s4"/> <!-- not important here -->
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

但这将立即开始您的两个并行步骤。您需要阻止第 2 步的开始。因此,您需要在第 2 步的 reader 中使用 Delegate,这将立即停止加载 file2,并等待信号开始读取。在 step1 代码的某处,您认为加载已完成,您向 step2 的委托 reader 发出信号以开始加载 file2.

第二个解决方案是:您创建自己的 SimpleAsyncTaskExecutor,它将启动 step1 并等待来自 step1 的信号以启动 step2。它基本上是第一个解决方案,但您在自定义 Executor 中等待信号,而不是在 Delegate reader 中。 (您可以从 SimpleAsyncTaskExecutor 复制源代码以获得想法)

这是有代价的,如果 step1 从未到达它向 step2 发出开始加载信号的部分,您的批处理将永远挂起。加载中的异常可能会导致这种情况。至于信号机制,Java 有很多方法可以做到这一点(wait()notifiy()、锁、信号量、非标准库可能)。

我不认为 spring 批次中有一些并行步骤触发器之王(但如果有,有人会发布它)。


我在问你的问题时已经回答了一些,你需要 2 次拆分:第一个加载文件集 A,第二个加载文件集 B。

<job id="job1">
    <split id="splitForSet_A" task-executor="taskExecutor" next="splitForSet_B">
        <flow><step id="step1" parent="s1"/></flow>
        <flow><step id="step2" parent="s2"/></flow>
        <flow><step id="step3" parent="s3"/></flow>
    </split>
   <split id="splitForSet_B" task-executor="taskExecutor" next="stepWhatever">
        <flow><step id="step4" parent="s4"/></flow>
        <flow><step id="step5" parent="s5"/></flow>
        <flow><step id="step6" parent="s6"/></flow>
    </split>
    <step id="stepWhatever" parent="sx"/>
</job>

步骤 1、2 和 3 将 运行 并行(并加载文件集 A),然后,一旦它们全部结束,第二个拆分 (splitForSet_B) 将开始并且 运行 并行执行第 4、5 和 6 步。拆分基本上是一个包含并行步骤 运行 的步骤。

您只需在每个步骤中指定您将使用的文件(因此第一次拆分的步骤与第二次拆分的步骤会有所不同。