Java 批处理:jobContext transientUserData 未通过步骤
Java batch: jobContext transientUserData not passed through steps
我正在使用 jsr-352 规范的 JBeret 实现。
这是我的作业配置,简而言之:
<job id="expired-customer-cancellation" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0" jsl-name="job-parent" parent="job-parent">
<step id="step1" next="step2">
<chunk item-count="#{jobParameters['chunksize']}?:3">
<reader ref="eccReader">
</reader>
<writer ref="eccWriter" />
</chunk>
<partition>
<mapper ref="eccMapper">
<properties>
<property name="threads" value="#{jobParameters['threads']}?:3"/>
<property name="records" value="#{jobParameters['records']}?:30"/>
</properties>
</mapper>
</partition>
</step>
<step id="step2">
<batchlet ref="eccMailBatchlet" />
</step>
</job>
Itemwriter class 做了这样的事情:
@Named
public class EccWriter extends AbstractItemWriter {
@Inject
Logger logger;
@Inject
JobContext jobContext;
@Override
public void writeItems(List<Object> list) throws Exception {
@SuppressWarnings("unchecked")
ArrayList<String> processed = Optional.ofNullable(jobContext.getTransientUserData()).map(ArrayList.class::cast).orElse(new ArrayList<String>());
list.stream().map(UserLogin.class::cast).forEach(input -> {
if (someConditions) {
processed.add(input.getUserId());
}
});
jobContext.setTransientUserData(processed); // update job transient data with processed list
}
}
现在我希望在 step2 上调用 jobContext.getTransientUserData() 时获得更新的列表,而我得到的只是一个 null值。
此外,每个分区都有自己的 jobContext.transientUserData,因此它将始终在分区开始时以 null 值开始。
我认为 jobContext 本身可能因其名称而误导常见错误。
在整个作业中引入一些数据的自然方式是什么?
这是当前 API 的差距,"thread local" 的行为可能令人惊讶,我同意。
您可以使用的一种技术是改为使用步骤持久用户数据。
例如从第 1 步开始:
StepExecution.setPersistentUserData(processed);
然后从第 2 步开始:
@Inject
JobContext ctx;
List<StepExecution> stepExecs = BatchRuntime.getJobOperator().getStepExecutions(ctx.getInstanceId());
// ... not shown here, but sort through stepExecs and find the one from step1.
StepExecution step2Exec = ... ;
Serializable userData = step2Exec.getPersistentUserData()
这在之前已作为一个需要改进的领域被注意到,并且应该考虑用于 Jakarta Batch(规范的新家)的未来增强。
我正在使用 jsr-352 规范的 JBeret 实现。
这是我的作业配置,简而言之:
<job id="expired-customer-cancellation" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0" jsl-name="job-parent" parent="job-parent">
<step id="step1" next="step2">
<chunk item-count="#{jobParameters['chunksize']}?:3">
<reader ref="eccReader">
</reader>
<writer ref="eccWriter" />
</chunk>
<partition>
<mapper ref="eccMapper">
<properties>
<property name="threads" value="#{jobParameters['threads']}?:3"/>
<property name="records" value="#{jobParameters['records']}?:30"/>
</properties>
</mapper>
</partition>
</step>
<step id="step2">
<batchlet ref="eccMailBatchlet" />
</step>
</job>
Itemwriter class 做了这样的事情:
@Named
public class EccWriter extends AbstractItemWriter {
@Inject
Logger logger;
@Inject
JobContext jobContext;
@Override
public void writeItems(List<Object> list) throws Exception {
@SuppressWarnings("unchecked")
ArrayList<String> processed = Optional.ofNullable(jobContext.getTransientUserData()).map(ArrayList.class::cast).orElse(new ArrayList<String>());
list.stream().map(UserLogin.class::cast).forEach(input -> {
if (someConditions) {
processed.add(input.getUserId());
}
});
jobContext.setTransientUserData(processed); // update job transient data with processed list
}
}
现在我希望在 step2 上调用 jobContext.getTransientUserData() 时获得更新的列表,而我得到的只是一个 null值。
此外,每个分区都有自己的 jobContext.transientUserData,因此它将始终在分区开始时以 null 值开始。
我认为 jobContext 本身可能因其名称而误导常见错误。
在整个作业中引入一些数据的自然方式是什么?
这是当前 API 的差距,"thread local" 的行为可能令人惊讶,我同意。
您可以使用的一种技术是改为使用步骤持久用户数据。
例如从第 1 步开始:
StepExecution.setPersistentUserData(processed);
然后从第 2 步开始:
@Inject
JobContext ctx;
List<StepExecution> stepExecs = BatchRuntime.getJobOperator().getStepExecutions(ctx.getInstanceId());
// ... not shown here, but sort through stepExecs and find the one from step1.
StepExecution step2Exec = ... ;
Serializable userData = step2Exec.getPersistentUserData()
这在之前已作为一个需要改进的领域被注意到,并且应该考虑用于 Jakarta Batch(规范的新家)的未来增强。