JSR 352:如何从分区步骤的每个分区的编写器收集数据?
JSR 352 :How to collect data from the Writer of each Partition of a Partitioned Step?
因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,求和,打印到日志;
我想在 Writer 中使用一个 static
变量,并使用 Step Context/Job Context 在 Step Listener 的 afterStep()
中获取它。然而,当我尝试它时,我得到了 null
。我能够在 Reader 的 close()
中获得这些值。
这样做正确吗?或者我应该使用 Partition Collector/Reducer/ Analyzer?
我在 Websphere Liberty 中使用 java 批处理。我正在 Eclipse 中开发。
I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when i tried it i got null.
此时 ItemWriter 可能已经被销毁,但我不确定。
Is this the right way to go about it?
是的,应该够用了。但是,您需要确保所有分区共享总行数,因为批处理运行时为每个分区维护一个 StepContext 克隆。您应该使用 JobContext
.
我认为使用 PartitionCollector 和 PartitionAnalyzer 也是一个不错的选择。接口 PartitionCollector 有一个方法 collectPartitionData()
来收集来自其分区的数据。收集后,批处理运行时会将此数据传递给 PartitionAnalyzer 以分析数据。请注意,有
- 每步 N 个 PartitionCollector(每个分区 1 个)
- 每个步骤 N 个 StepContext(每个分区 1 个)
- 每步 1 个 PartitionAnalyzer
写入的记录可以通过StepContext的transientUserData
传递。由于 StepContext 是为它自己的 step-partition 保留的,临时用户数据不会被其他分区覆盖。
实现如下:
MyItemWriter :
@Inject
private StepContext stepContext;
@Override
public void writeItems(List<Object> items) throws Exception {
// ...
Object userData = stepContext.getTransientUserData();
stepContext.setTransientUserData(partRowCount);
}
我的分区收集器
@Inject
private StepContext stepContext;
@Override
public Serializable collectPartitionData() throws Exception {
// get transient user data
Object userData = stepContext.getTransientUserData();
int partRowCount = userData != null ? (int) userData : 0;
return partRowCount;
}
我的分区分析器
private int rowCount = 0;
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
rowCount += (int) fromCollector;
System.out.printf("%d rows processed (all partitions).%n", rowCount);
}
让我对已接受的答案提供一些替代方案并添加一些评论。
PartitionAnalyzer 变体 - 使用 analyzeStatus() 方法
另一种技术是使用 analyzeStatus
,它仅在每个整个分区的末尾被调用,并通过 分区级别 退出状态。
public void analyzeStatus(BatchStatus batchStatus, String exitStatus)
相比之下,上面使用 analyzeCollectorData
的答案在每个分区的每个块的末尾被调用。
例如
public class MyItemWriteListener extends AbstractItemWriteListener {
@Inject
StepContext stepCtx;
@Override
public void afterWrite(List<Object> items) throws Exception {
// update 'newCount' based on items.size()
stepCtx.setExitStatus(Integer.toString(newCount));
}
显然,这只有在您没有将退出状态用于其他目的时才有效。您可以设置任何工件的退出状态(尽管这种自由可能是必须跟踪的另一件事)。
评论
API 旨在促进跨 JVM 分派各个分区的实现,(例如,在 Liberty 中你可以看到这个 here。)但是使用 static 将您绑定到单个 JVM,因此不推荐使用此方法。
另请注意,JobContext 和 StepContext 都以我们在批处理中看到的 "thread-local"-like 方式实现.
因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,求和,打印到日志;
我想在 Writer 中使用一个 static
变量,并使用 Step Context/Job Context 在 Step Listener 的 afterStep()
中获取它。然而,当我尝试它时,我得到了 null
。我能够在 Reader 的 close()
中获得这些值。
这样做正确吗?或者我应该使用 Partition Collector/Reducer/ Analyzer?
我在 Websphere Liberty 中使用 java 批处理。我正在 Eclipse 中开发。
I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when i tried it i got null.
此时 ItemWriter 可能已经被销毁,但我不确定。
Is this the right way to go about it?
是的,应该够用了。但是,您需要确保所有分区共享总行数,因为批处理运行时为每个分区维护一个 StepContext 克隆。您应该使用 JobContext
.
我认为使用 PartitionCollector 和 PartitionAnalyzer 也是一个不错的选择。接口 PartitionCollector 有一个方法 collectPartitionData()
来收集来自其分区的数据。收集后,批处理运行时会将此数据传递给 PartitionAnalyzer 以分析数据。请注意,有
- 每步 N 个 PartitionCollector(每个分区 1 个)
- 每个步骤 N 个 StepContext(每个分区 1 个)
- 每步 1 个 PartitionAnalyzer
写入的记录可以通过StepContext的transientUserData
传递。由于 StepContext 是为它自己的 step-partition 保留的,临时用户数据不会被其他分区覆盖。
实现如下:
MyItemWriter :
@Inject
private StepContext stepContext;
@Override
public void writeItems(List<Object> items) throws Exception {
// ...
Object userData = stepContext.getTransientUserData();
stepContext.setTransientUserData(partRowCount);
}
我的分区收集器
@Inject
private StepContext stepContext;
@Override
public Serializable collectPartitionData() throws Exception {
// get transient user data
Object userData = stepContext.getTransientUserData();
int partRowCount = userData != null ? (int) userData : 0;
return partRowCount;
}
我的分区分析器
private int rowCount = 0;
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
rowCount += (int) fromCollector;
System.out.printf("%d rows processed (all partitions).%n", rowCount);
}
让我对已接受的答案提供一些替代方案并添加一些评论。
PartitionAnalyzer 变体 - 使用 analyzeStatus() 方法
另一种技术是使用 analyzeStatus
,它仅在每个整个分区的末尾被调用,并通过 分区级别 退出状态。
public void analyzeStatus(BatchStatus batchStatus, String exitStatus)
相比之下,上面使用 analyzeCollectorData
的答案在每个分区的每个块的末尾被调用。
例如
public class MyItemWriteListener extends AbstractItemWriteListener {
@Inject
StepContext stepCtx;
@Override
public void afterWrite(List<Object> items) throws Exception {
// update 'newCount' based on items.size()
stepCtx.setExitStatus(Integer.toString(newCount));
}
显然,这只有在您没有将退出状态用于其他目的时才有效。您可以设置任何工件的退出状态(尽管这种自由可能是必须跟踪的另一件事)。
评论
API 旨在促进跨 JVM 分派各个分区的实现,(例如,在 Liberty 中你可以看到这个 here。)但是使用 static 将您绑定到单个 JVM,因此不推荐使用此方法。
另请注意,JobContext 和 StepContext 都以我们在批处理中看到的 "thread-local"-like 方式实现.