Spring 批处理 3.0:分区步骤的 StepExecutionListener 以及将执行上下文值级联到分区作业

Spring Batch 3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job

给定 Spring 批处理 使用分区的作业:

<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
        <batch:listeners>
            <batch:listener ref="reportingJobExecutionListenerr" />
        </batch:listeners>
        <batch:step id="reportingMasterStep">
            <partition step="reportingSlaveStep"
                partitioner="reportingPartitioner">
                <batch:handler grid-size="10" task-executor="taskExecutor" />
            </partition>
        </batch:step>
</job>

reportingSlaveStep定义为:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
        <job ref="reportingSlaveJob" />
</step>

reportingSlaveJob定义为:

    <job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
        <batch:listeners>
            <batch:listener ref="reportsOutputListener" />
        </batch:listeners>
        <batch:split id="reportsCreationSplit"
            task-executor="taskExecutor">
            <batch:flow>
                <batch:step id="basicReportStep">
                    <tasklet throttle-limit="5" task-executor="taskExecutor">
                        <batch:chunk reader="basicReportReader"
                            writer="basicReportWriter" commit-interval="500" />
                    </tasklet>
                </batch:step>
            </batch:flow>
            <batch:flow>
                <batch:step id="advancedReportStep">
                    <tasklet throttle-limit="5" task-executor="taskExecutor">
                        <batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
                            commit-interval="500" />
                    </tasklet>
                </batch:step>
            </batch:flow>
       </batch:split>
     </job>

我现在有 2 个问题:

  1. 我想为每个分区创建一个新的 reportsOutputListener 实例。我可以通过使 reportsOutputListener 成为 Step 作用域 bean 来实现吗?
  2. 我希望能够访问为 reportingJob 创建的相同 jobExecutionContext,以便在 reportingSlaveJob 中访问。我是否需要对此进行任何特殊处理,或者 reportingSlaveStepSlaveJob 也使用 reportingJob 中的相同 jobExecutionContext 实例?
  3. EDIT :当我 运行 上面的工作时,有时我得到一个异常说 "A job execution for this job is already running" 和其他时候我得到 NullPointerExceptionMapExecutionContextDao.java:130 上。

EDIT :另请注意,对于第 2 点,slaveJob 无法访问 stepExecutionContext 中添加的值(使用 [=29 访问=] 在 spring 配置 xml) 中由 reportingPartitionerstepExecutionContext 中针对键的值显示为 null.

I want a new reportsOutputListener instance to be created for each partition. Can I achieve this by making reportsOutputListener a Step scoped bean?

答案是。 (正如Mahmoud Ben Hassine评论中提到的)

I want to be able to access the same jobExecutionContext created for reportingJob to be accessible in reportingSlaveJob. Do I need to any special handling for this or is the same jobExecutionContext instance in reportingJob is used by the reportingSlaveStepSlaveJob as well?

答案是。我深入研究 Spring Batch 代码,发现 JobStep 使用 JobParametersExtractor 将值从 stepExecutionContext 复制到 JobParameters。这意味着 reportingSlaveJob 可以从 JobParameters 而不是 StepExecutionContext 访问这些值。也就是说,出于某种原因,Srping Batch 3.0 中的 DefaultJobParametersExtractor 实现似乎没有按预期将值复制到 jobParameters。我最终编写了以下自定义提取器:

public class CustomJobParametersExtractor implements JobParametersExtractor {

    private Set<String> keys;

    public CustomJobParametersExtractor () {
        this.keys = new HashSet<>();
    }

    @Override
    public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
        JobParametersBuilder builder = new JobParametersBuilder();
        Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
        ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
        ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();

        // copy job parameters from parent job to delegate job
        for (String key : jobParameters.keySet()) {
            builder.addParameter(key, jobParameters.get(key));
        }

        // copy job/step context from parent job/step to delegate job
        for (String key : keys) {
            if (jobExecutionContext.containsKey(key)) {
                builder.addString(key, jobExecutionContext.getString(key));
            } else if (stepExecutionContext.containsKey(key)) {
                builder.addString(key, stepExecutionContext.getString(key));
            } else if (jobParameters.containsKey(key)) {
                builder.addString(key, (String) jobParameters.get(key).getValue());
            }
        }
        return builder.toJobParameters();
    }

    public void setKeys(String[] keys) {
        this.keys = new HashSet<>(Arrays.asList(keys));
    }

}

然后我可以在报告从属步骤中使用上述提取器,如下所示:

<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
        <job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>

其中 customJobParametersExtractor 是类型 CustomJobParametersExtractor 的 bean,它传递了我们要复制到 reportingSlaveJobJobParameters 的所有键。

When I run the above job, at times I get an exception saying that the "A job execution for this job is already running" and other times I get a NullPointerException on MapExecutionContextDao.java:130

发生这种情况的原因是因为 没有 我的 CustomJobParameterExtractorreportingSlaveJob 是空 JobParameters 启动的。对于 Spring Batch 创建新的作业实例,reportingSlaveJob 的每次启动作业参数必须不同。使用 CustomJobParameterExtractor 也解决了这个问题。