Spring 批处理 3.0:分区步骤的 StepExecutionListener 以及将执行上下文值级联到分区作业
Spring Batch 3.0 : StepExecutionListener for a partitioned Step and cascading of execution context values to the partitioned job
给定 Spring 批处理 使用分区的作业:
<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportingJobExecutionListenerr" />
</batch:listeners>
<batch:step id="reportingMasterStep">
<partition step="reportingSlaveStep"
partitioner="reportingPartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor" />
</partition>
</batch:step>
</job>
和reportingSlaveStep
定义为:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" />
</step>
和reportingSlaveJob
定义为:
<job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportsOutputListener" />
</batch:listeners>
<batch:split id="reportsCreationSplit"
task-executor="taskExecutor">
<batch:flow>
<batch:step id="basicReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="basicReportReader"
writer="basicReportWriter" commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="advancedReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
</batch:split>
</job>
我现在有 2 个问题:
- 我想为每个分区创建一个新的
reportsOutputListener
实例。我可以通过使 reportsOutputListener
成为 Step
作用域 bean 来实现吗?
- 我希望能够访问为
reportingJob
创建的相同 jobExecutionContext
,以便在 reportingSlaveJob
中访问。我是否需要对此进行任何特殊处理,或者 reportingSlaveStepSlaveJob
也使用 reportingJob
中的相同 jobExecutionContext
实例?
- EDIT :当我 运行 上面的工作时,有时我得到一个异常说 "A job execution for this job is already running" 和其他时候我得到
NullPointerException
在 MapExecutionContextDao.java:130
上。
EDIT :另请注意,对于第 2 点,slaveJob
无法访问 stepExecutionContext
中添加的值(使用 [=29 访问=] 在 spring 配置 xml) 中由 reportingPartitioner
。 stepExecutionContext
中针对键的值显示为 null
.
I want a new reportsOutputListener instance to be created for each
partition. Can I achieve this by making reportsOutputListener a Step
scoped bean?
答案是是。 (正如Mahmoud Ben Hassine评论中提到的)
I want to be able to access the same jobExecutionContext created for
reportingJob to be accessible in reportingSlaveJob. Do I need to any
special handling for this or is the same jobExecutionContext instance
in reportingJob is used by the reportingSlaveStepSlaveJob as well?
答案是否。我深入研究 Spring Batch 代码,发现 JobStep
使用 JobParametersExtractor
将值从 stepExecutionContext
复制到 JobParameters
。这意味着 reportingSlaveJob
可以从 JobParameters
而不是 StepExecutionContext
访问这些值。也就是说,出于某种原因,Srping Batch 3.0 中的 DefaultJobParametersExtractor
实现似乎没有按预期将值复制到 jobParameters
。我最终编写了以下自定义提取器:
public class CustomJobParametersExtractor implements JobParametersExtractor {
private Set<String> keys;
public CustomJobParametersExtractor () {
this.keys = new HashSet<>();
}
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
JobParametersBuilder builder = new JobParametersBuilder();
Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
// copy job parameters from parent job to delegate job
for (String key : jobParameters.keySet()) {
builder.addParameter(key, jobParameters.get(key));
}
// copy job/step context from parent job/step to delegate job
for (String key : keys) {
if (jobExecutionContext.containsKey(key)) {
builder.addString(key, jobExecutionContext.getString(key));
} else if (stepExecutionContext.containsKey(key)) {
builder.addString(key, stepExecutionContext.getString(key));
} else if (jobParameters.containsKey(key)) {
builder.addString(key, (String) jobParameters.get(key).getValue());
}
}
return builder.toJobParameters();
}
public void setKeys(String[] keys) {
this.keys = new HashSet<>(Arrays.asList(keys));
}
}
然后我可以在报告从属步骤中使用上述提取器,如下所示:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>
其中 customJobParametersExtractor
是类型 CustomJobParametersExtractor
的 bean,它传递了我们要复制到 reportingSlaveJob
的 JobParameters
的所有键。
When I run the above job, at times I get an exception saying that the
"A job execution for this job is already running" and other times I
get a NullPointerException on MapExecutionContextDao.java:130
发生这种情况的原因是因为 没有 我的 CustomJobParameterExtractor
,reportingSlaveJob
是空 JobParameters
启动的。对于 Spring Batch 创建新的作业实例,reportingSlaveJob
的每次启动作业参数必须不同。使用 CustomJobParameterExtractor
也解决了这个问题。
给定 Spring 批处理 使用分区的作业:
<job id="reportingJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportingJobExecutionListenerr" />
</batch:listeners>
<batch:step id="reportingMasterStep">
<partition step="reportingSlaveStep"
partitioner="reportingPartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor" />
</partition>
</batch:step>
</job>
和reportingSlaveStep
定义为:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" />
</step>
和reportingSlaveJob
定义为:
<job id="reportingSlaveJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="reportsOutputListener" />
</batch:listeners>
<batch:split id="reportsCreationSplit"
task-executor="taskExecutor">
<batch:flow>
<batch:step id="basicReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="basicReportReader"
writer="basicReportWriter" commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="advancedReportStep">
<tasklet throttle-limit="5" task-executor="taskExecutor">
<batch:chunk reader="advancedReportDataReader" writer="advancedReportWriter"
commit-interval="500" />
</tasklet>
</batch:step>
</batch:flow>
</batch:split>
</job>
我现在有 2 个问题:
- 我想为每个分区创建一个新的
reportsOutputListener
实例。我可以通过使reportsOutputListener
成为Step
作用域 bean 来实现吗? - 我希望能够访问为
reportingJob
创建的相同jobExecutionContext
,以便在reportingSlaveJob
中访问。我是否需要对此进行任何特殊处理,或者reportingSlaveStepSlaveJob
也使用reportingJob
中的相同jobExecutionContext
实例? - EDIT :当我 运行 上面的工作时,有时我得到一个异常说 "A job execution for this job is already running" 和其他时候我得到
NullPointerException
在MapExecutionContextDao.java:130
上。
EDIT :另请注意,对于第 2 点,slaveJob
无法访问 stepExecutionContext
中添加的值(使用 [=29 访问=] 在 spring 配置 xml) 中由 reportingPartitioner
。 stepExecutionContext
中针对键的值显示为 null
.
I want a new reportsOutputListener instance to be created for each partition. Can I achieve this by making reportsOutputListener a Step scoped bean?
答案是是。 (正如Mahmoud Ben Hassine评论中提到的)
I want to be able to access the same jobExecutionContext created for reportingJob to be accessible in reportingSlaveJob. Do I need to any special handling for this or is the same jobExecutionContext instance in reportingJob is used by the reportingSlaveStepSlaveJob as well?
答案是否。我深入研究 Spring Batch 代码,发现 JobStep
使用 JobParametersExtractor
将值从 stepExecutionContext
复制到 JobParameters
。这意味着 reportingSlaveJob
可以从 JobParameters
而不是 StepExecutionContext
访问这些值。也就是说,出于某种原因,Srping Batch 3.0 中的 DefaultJobParametersExtractor
实现似乎没有按预期将值复制到 jobParameters
。我最终编写了以下自定义提取器:
public class CustomJobParametersExtractor implements JobParametersExtractor {
private Set<String> keys;
public CustomJobParametersExtractor () {
this.keys = new HashSet<>();
}
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
JobParametersBuilder builder = new JobParametersBuilder();
Map<String, JobParameter> jobParameters = stepExecution.getJobParameters().getParameters();
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
// copy job parameters from parent job to delegate job
for (String key : jobParameters.keySet()) {
builder.addParameter(key, jobParameters.get(key));
}
// copy job/step context from parent job/step to delegate job
for (String key : keys) {
if (jobExecutionContext.containsKey(key)) {
builder.addString(key, jobExecutionContext.getString(key));
} else if (stepExecutionContext.containsKey(key)) {
builder.addString(key, stepExecutionContext.getString(key));
} else if (jobParameters.containsKey(key)) {
builder.addString(key, (String) jobParameters.get(key).getValue());
}
}
return builder.toJobParameters();
}
public void setKeys(String[] keys) {
this.keys = new HashSet<>(Arrays.asList(keys));
}
}
然后我可以在报告从属步骤中使用上述提取器,如下所示:
<step id="reportingSlaveStep" xmlns="http://www.springframework.org/schema/batch">
<job ref="reportingSlaveJob" job-parameters-extractor="customJobParametersExtractor"/>
</step>
其中 customJobParametersExtractor
是类型 CustomJobParametersExtractor
的 bean,它传递了我们要复制到 reportingSlaveJob
的 JobParameters
的所有键。
When I run the above job, at times I get an exception saying that the "A job execution for this job is already running" and other times I get a NullPointerException on MapExecutionContextDao.java:130
发生这种情况的原因是因为 没有 我的 CustomJobParameterExtractor
,reportingSlaveJob
是空 JobParameters
启动的。对于 Spring Batch 创建新的作业实例,reportingSlaveJob
的每次启动作业参数必须不同。使用 CustomJobParameterExtractor
也解决了这个问题。