如何覆盖 Spring Batch CompositeItemWriter manage transaction for delegate writers 在出现异常的情况下?
How to override Spring Batch CompositeItemWriter manage transaction for delegate writers in case of exception arises?
我在这里扩展这个 问题:
在我的例子中,我有一个下面的 CompositeItemWriter
,它将数据写入同一数据库的多个 table,在写入数据之前,它通过实施各种业务规则来转换数据。这里一条记录可能满足不同的业务规则等。因此一个作者可能比其他人获得更多数据。
@Bean
public CompositeItemWriter<Employee> EmployeeCompositeWriter() throws Exception {
List<ItemWriter<? super Employee>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(employee);
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
场景 - 假设第一个作者工作得很好,第二个作者产生异常,然后第三个和第四个作者没有被调用这是Automic
自然默认的Spring 由于事务回滚而发生批处理。
这里即使2nd writer出现任何异常,我也想成功调用3rd和4th writer并保存数据,我也想成功保存1st writer和2nd writer的数据..只有异常数据我想要在 SkipListener
的帮助下存储到错误 Table 中,以识别哪些记录是垃圾记录。
解决方案-为了实现上述场景,我们在每个编写器写入方法上添加了@Transactional(propagation = Propagation.REQUIRES_NEW)
,第一个编写器现在保存数据,第二个编写器生成异常(使用namedJdbcTemplate.batchUpdate()
以批量更新数据)我们正在缓存它并重新抛出它,但我们可以看到 提交级别降低到 1 (偏离路线以识别确切的垃圾记录)并且当第二个作者再次出现异常时,第一个作者被调用并保存重复数据,第二个、第三个和第四个作者被调用,而且垃圾记录没有流向第三个和第四个作者。
在这里,如果一条或几条记录是垃圾,我不希望整个批处理作业停止,因为这个作业对我们每次 运行 都是至关重要的。有没有什么办法可以保存所有没有出现异常的数据,并且在可能的情况下借助SkipListener
或任何其他方式仅将异常数据保存到错误table中?
有什么方法可以将任何步骤的(READER 或处理器)部分的批处理组件重用到另一个步骤?
我看不出有什么方法可以让 spring-batch 将整个块写成原子的单个事务与你的想法保持原子性,只要你想 skiplistener
.
我不确定这是否可行,但您可以快速测试一下。这就是消息在某些集成框架(如骆驼)中从一个处理器到错误处理流程中携带异常的方式。
您的项目 reader 应该 return 一个 EmployeeWrapper
,其中包含 employee
记录并有一个存储异常的字段。
您的 CompositeItemWriter 收到 List<EmployeeWrapper>
并且复合编写器有 5 个编写器而不是 4 个。第 5 个编写器将执行您的 SkipListener
会执行的操作。
List<ItemWriter<? super EmployeeWrapper>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
employee.add(errorRecordWriter());
您的前 4 位作者从不抛出异常,而是将其标记为已处理但将捕获的异常添加为 EmployeeWrapper 的属性。
您的第 5 个 errorRecordWriter
接收所有记录,检查任何添加了异常属性的记录并将它们写入错误 table。如果写入错误记录失败,你可以抛出异常并重试所有 5 个写入器。
关于批量更新失败时如何知道哪条记录是错误记录。似乎当块中发生错误时,spring 回滚块并开始重试该块中的记录,以便它知道哪个记录有问题。所以你可以在你的个人作家身上做同样的事情。即捕获批量更新异常,然后一一重试,分离出错误记录
这里有几件事:
- 不要将
@Transactional
与 Spring Batch 一起使用 - Spring Batch 会为您管理事务,因此使用该注释会导致问题。不要使用它。
- 自己管理异常 - 在您描述的场景中,您想为同一项目调用四个
ItemWriter
实现,但想跳过异常在委托 ItemWriter
级别,您需要编写自己的 CompositeItemWriter
实现。 Spring 出于方便,批处理提供了这种级别的组合(我们将同一项目委托给每个 ItemWriter
实现),但从框架的角度来看,它只是一个 ItemWriter
。为了在子 ItemWriter
级别处理异常,您需要编写自己的包装器并自行管理异常。
更新:
我所指的自定义 ItemWriter
的示例实现(请注意以下代码未经测试):
public class MyCompositeItemWriter<T> implements ItemWriter<T> {
private List<ItemWriter<? super T>> delegates;
@Override
public void write(List<? extends T> items) throws Exception {
for(ItemWriter delegate : delegates) {
try {
delegate.write(items);
}
catch (Exception e) {
// Do logging/error handling here
}
}
}
@Override
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
}
问题的主要原因是,我们试图使用两个不同的 ItemWriter
将数据写入同一个 table,这导致交易行为异常。
我们已经实现了 SkipListenets
(考虑到当我们在初始数据加载时执行验证时 use 可能不会经常获取垃圾或垃圾数据这一事实。)
由于我们在批处理作业中实施了 “Spring 批处理跳过技术”,这有助于我们指定某些异常类型和最大编号。跳过的项目,每当抛出其中一个可跳过的异常时,批处理作业不会失败,但会跳过该特定项目并转到下一个项目。只有当最大没有。达到跳过的项目数,批处理作业将失败。我们使用具有 “容错” 功能的跳过逻辑 Spring 批处理应用于 chunk-oriented 步骤中的项目,而不是整个步骤。
因此,如果项目未能在一个代表处写入,那么所有其他代表都将认为它失败(该项目不会传递给其他代表)并且我们对此很好,因为我们正在捕获详细信息错误日志 table,我们可以根据需要从那里重新处理它。
我在这里扩展这个
在我的例子中,我有一个下面的 CompositeItemWriter
,它将数据写入同一数据库的多个 table,在写入数据之前,它通过实施各种业务规则来转换数据。这里一条记录可能满足不同的业务规则等。因此一个作者可能比其他人获得更多数据。
@Bean
public CompositeItemWriter<Employee> EmployeeCompositeWriter() throws Exception {
List<ItemWriter<? super Employee>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(employee);
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
场景 - 假设第一个作者工作得很好,第二个作者产生异常,然后第三个和第四个作者没有被调用这是Automic
自然默认的Spring 由于事务回滚而发生批处理。
这里即使2nd writer出现任何异常,我也想成功调用3rd和4th writer并保存数据,我也想成功保存1st writer和2nd writer的数据..只有异常数据我想要在 SkipListener
的帮助下存储到错误 Table 中,以识别哪些记录是垃圾记录。
解决方案-为了实现上述场景,我们在每个编写器写入方法上添加了@Transactional(propagation = Propagation.REQUIRES_NEW)
,第一个编写器现在保存数据,第二个编写器生成异常(使用namedJdbcTemplate.batchUpdate()
以批量更新数据)我们正在缓存它并重新抛出它,但我们可以看到 提交级别降低到 1 (偏离路线以识别确切的垃圾记录)并且当第二个作者再次出现异常时,第一个作者被调用并保存重复数据,第二个、第三个和第四个作者被调用,而且垃圾记录没有流向第三个和第四个作者。
在这里,如果一条或几条记录是垃圾,我不希望整个批处理作业停止,因为这个作业对我们每次 运行 都是至关重要的。有没有什么办法可以保存所有没有出现异常的数据,并且在可能的情况下借助SkipListener
或任何其他方式仅将异常数据保存到错误table中?
有什么方法可以将任何步骤的(READER 或处理器)部分的批处理组件重用到另一个步骤?
我看不出有什么方法可以让 spring-batch 将整个块写成原子的单个事务与你的想法保持原子性,只要你想 skiplistener
.
我不确定这是否可行,但您可以快速测试一下。这就是消息在某些集成框架(如骆驼)中从一个处理器到错误处理流程中携带异常的方式。
您的项目 reader 应该 return 一个
EmployeeWrapper
,其中包含employee
记录并有一个存储异常的字段。您的 CompositeItemWriter 收到
List<EmployeeWrapper>
并且复合编写器有 5 个编写器而不是 4 个。第 5 个编写器将执行您的SkipListener
会执行的操作。
List<ItemWriter<? super EmployeeWrapper>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
employee.add(errorRecordWriter());
您的前 4 位作者从不抛出异常,而是将其标记为已处理但将捕获的异常添加为 EmployeeWrapper 的属性。
您的第 5 个
errorRecordWriter
接收所有记录,检查任何添加了异常属性的记录并将它们写入错误 table。如果写入错误记录失败,你可以抛出异常并重试所有 5 个写入器。关于批量更新失败时如何知道哪条记录是错误记录。似乎当块中发生错误时,spring 回滚块并开始重试该块中的记录,以便它知道哪个记录有问题。所以你可以在你的个人作家身上做同样的事情。即捕获批量更新异常,然后一一重试,分离出错误记录
这里有几件事:
- 不要将
@Transactional
与 Spring Batch 一起使用 - Spring Batch 会为您管理事务,因此使用该注释会导致问题。不要使用它。 - 自己管理异常 - 在您描述的场景中,您想为同一项目调用四个
ItemWriter
实现,但想跳过异常在委托ItemWriter
级别,您需要编写自己的CompositeItemWriter
实现。 Spring 出于方便,批处理提供了这种级别的组合(我们将同一项目委托给每个ItemWriter
实现),但从框架的角度来看,它只是一个ItemWriter
。为了在子ItemWriter
级别处理异常,您需要编写自己的包装器并自行管理异常。
更新:
我所指的自定义 ItemWriter
的示例实现(请注意以下代码未经测试):
public class MyCompositeItemWriter<T> implements ItemWriter<T> {
private List<ItemWriter<? super T>> delegates;
@Override
public void write(List<? extends T> items) throws Exception {
for(ItemWriter delegate : delegates) {
try {
delegate.write(items);
}
catch (Exception e) {
// Do logging/error handling here
}
}
}
@Override
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
}
问题的主要原因是,我们试图使用两个不同的 ItemWriter
将数据写入同一个 table,这导致交易行为异常。
我们已经实现了 SkipListenets
(考虑到当我们在初始数据加载时执行验证时 use 可能不会经常获取垃圾或垃圾数据这一事实。)
由于我们在批处理作业中实施了 “Spring 批处理跳过技术”,这有助于我们指定某些异常类型和最大编号。跳过的项目,每当抛出其中一个可跳过的异常时,批处理作业不会失败,但会跳过该特定项目并转到下一个项目。只有当最大没有。达到跳过的项目数,批处理作业将失败。我们使用具有 “容错” 功能的跳过逻辑 Spring 批处理应用于 chunk-oriented 步骤中的项目,而不是整个步骤。
因此,如果项目未能在一个代表处写入,那么所有其他代表都将认为它失败(该项目不会传递给其他代表)并且我们对此很好,因为我们正在捕获详细信息错误日志 table,我们可以根据需要从那里重新处理它。