如何覆盖 Spring Batch CompositeItemWriter manage transaction for delegate writers 在出现异常的情况下?

How to override Spring Batch CompositeItemWriter manage transaction for delegate writers in case of exception arises?

我在这里扩展这个 问题:

在我的例子中,我有一个下面的 CompositeItemWriter,它将数据写入同一数据库的多个 table,在写入数据之前,它通过实施各种业务规则来转换数据。这里一条记录可能满足不同的业务规则等。因此一个作者可能比其他人获得更多数据。

@Bean
public CompositeItemWriter<Employee> EmployeeCompositeWriter() throws Exception {
    List<ItemWriter<? super Employee>> employee = new ArrayList<>();
    employee.add(employeeWriter());
    employee.add(departmentWriter());
    employee.add(stockWriter());
    employee.add(purchaseWriter());

    CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
    compositeItemWriter.setDelegates(employee);
    compositeItemWriter.afterPropertiesSet();
    return compositeItemWriter;
}

场景 - 假设第一个作者工作得很好,第二个作者产生异常,然后第三个和第四个作者没有被调用这是Automic自然默认的Spring 由于事务回滚而发生批处理。

这里即使2nd writer出现任何异常,我也想成功调用3rd和4th writer并保存数据,我也想成功保存1st writer和2nd writer的数据..只有异常数据我想要在 SkipListener 的帮助下存储到错误 Table 中,以识别哪些记录是垃圾记录。

解决方案-为了实现上述场景,我们在每个编写器写入方法上添加了@Transactional(propagation = Propagation.REQUIRES_NEW),第一个编写器现在保存数据,第二个编写器生成异常(使用namedJdbcTemplate.batchUpdate() 以批量更新数据)我们正在缓存它并重新抛出它,但我们可以看到 提交级别降低到 1 (偏离路线以识别确切的垃圾记录)并且当第二个作者再次出现异常时,第一个作者被调用并保存重复数据,第二个、第三个和第四个作者被调用,而且垃圾记录没有流向第三个和第四个作者。

在这里,如果一条或几条记录是垃圾,我不希望整个批处理作业停止,因为这个作业对我们每次 运行 都是至关重要的。有没有什么办法可以保存所有没有出现异常的数据,并且在可能的情况下借助SkipListener或任何其他方式仅将异常数据保存到错误table中?

有什么方法可以将任何步骤的(READER 或处理器)部分的批处理组件重用到另一个步骤?

我看不出有什么方法可以让 spring-batch 将整个块写成原子的单个事务与你的想法保持原子性,只要你想 skiplistener.

我不确定这是否可行,但您可以快速测试一下。这就是消息在某些集成框架(如骆驼)中从一个处理器到错误处理流程中携带异常的方式。

  • 您的项目 reader 应该 return 一个 EmployeeWrapper,其中包含 employee 记录并有一个存储异常的字段。

  • 您的 CompositeItemWriter 收到 List<EmployeeWrapper> 并且复合编写器有 5 个编写器而不是 4 个。第 5 个编写器将执行您的 SkipListener 会执行的操作。

    List<ItemWriter<? super EmployeeWrapper>> employee = new ArrayList<>();
    employee.add(employeeWriter());
    employee.add(departmentWriter());
    employee.add(stockWriter());
    employee.add(purchaseWriter());
    employee.add(errorRecordWriter());
  • 您的前 4 位作者从不抛出异常,而是将其标记为已处理但将捕获的异常添加为 EmployeeWrapper 的属性。

  • 您的第 5 个 errorRecordWriter 接收所有记录,检查任何添加了异常属性的记录并将它们写入错误 table。如果写入错误记录失败,你可以抛出异常并重试所有 5 个写入器。

  • 关于批量更新失败时如何知道哪条记录是错误记录。似乎当块中发生错误时,spring 回滚块并开始重试该块中的记录,以便它知道哪个记录有问题。所以你可以在你的个人作家身上做同样的事情。即捕获批量更新异常,然后一一重试,分离出错误记录

这里有几件事:

  1. 不要将 @Transactional 与 Spring Batch 一起使用 - Spring Batch 会为您管理事务,因此使用该注释会导致问题。不要使用它。
  2. 自己管理异常 - 在您描述的场景中,您想为同一项目调用四个 ItemWriter 实现,但想跳过异常在委托 ItemWriter 级别,您需要编写自己的 CompositeItemWriter 实现。 Spring 出于方便,批处理提供了这种级别的组合(我们将同一项目委托给每个 ItemWriter 实现),但从框架的角度来看,它只是一个 ItemWriter。为了在子 ItemWriter 级别处理异常,您需要编写自己的包装器并自行管理异常。

更新:
我所指的自定义 ItemWriter 的示例实现(请注意以下代码未经测试):

public class MyCompositeItemWriter<T> implements ItemWriter<T> {
      private List<ItemWriter<? super T>> delegates;
 
    @Override
      public void write(List<? extends T> items) throws Exception {
            for(ItemWriter delegate : delegates) {
               try {
                  delegate.write(items);
               }
               catch (Exception e) {
                  // Do logging/error handling here
               }
            }
    }

    @Override
    public void setDelegates(List<ItemWriter<? super T>> delegates) {
        super.setDelegates(delegates);
        this.delegates = delegates;
    }
}

问题的主要原因是,我们试图使用两个不同的 ItemWriter 将数据写入同一个 table,这导致交易行为异常。

我们已经实现了 SkipListenets(考虑到当我们在初始数据加载时执行验证时 use 可能不会经常获取垃圾或垃圾数据这一事实。)

由于我们在批处理作业中实施了 “Spring 批处理跳过技术”,这有助于我们指定某些异常类型和最大编号。跳过的项目,每当抛出其中一个可跳过的异常时,批处理作业不会失败,但会跳过该特定项目并转到下一个项目。只有当最大没有。达到跳过的项目数,批处理作业将失败。我们使用具有 “容错” 功能的跳过逻辑 Spring 批处理应用于 chunk-oriented 步骤中的项目,而不是整个步骤。

因此,如果项目未能在一个代表处写入,那么所有其他代表都将认为它失败(该项目不会传递给其他代表)并且我们对此很好,因为我们正在捕获详细信息错误日志 table,我们可以根据需要从那里重新处理它。