Spring 批处理:一个步骤中的多个项目阅读器

Spring Batch: Multiple Item Readers in a single step

我是spring批次的新手。 spring批次我需要完成的任务如下:

  1. 需要从数据库中读取一些元数据。
  2. 根据这个元数据,我需要读取一些文件。
  3. 经过一些处理后,需要将这些值从文件写入数据库。

我的查询如下:

一个。对于第一个要求,我需要将整个结果集映射到单个对象,其中 Person 相关数据在 1 table 中,Pets 相关数据在另一个 table 中,并由 person id 连接。

public class PersonPetDetails {

    private String personName;
    private String personAddr;

    private int personAge;

    private List<Pet> pets;

为此,我编写了一个自定义项 reader,它扩展了 JdbcCursorItemReader。

public class CustomJDBCCusrorItemReader<T> extends JdbcCursorItemReader<T> {

    private ResultSetExtractor<T> resultSetExtractor;


    public void setResultSetExtractor(ResultSetExtractor<T> resultSetExtractor) {
        this.resultSetExtractor = resultSetExtractor;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        setVerifyCursorPosition(false);
        Assert.notNull(getDataSource(), "DataSource must be provided");
        Assert.notNull(getSql(), "The SQL query must be provided");
        Assert.notNull(resultSetExtractor, "ResultSetExtractor must be provided");
    }


    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {      
        return resultSetExtractor.extractData(rs);
    }
}

这是达到我要求的正确方法吗?或者有更好的方法吗?

b。 AFAIK,在 spring 批处理中,如果没有作者,就不可能只有 reader 的步骤。因此,我无法在作业的不同步骤中调用另一组 reader。那么如何一步调用多个reader呢?

c。此外,根据某些情况,我可能需要调用第三组 Reader。如何在一个步骤中有条件地调用 reader?

感谢您阅读我的 post。我知道它很长。任何帮助深表感谢。另外,我想一个示例代码片段可以帮助我更好地理解这一点。 :)

我会推荐如下

高级设计:

  1. 分区器 它将处理人员名单。注:此时没有拉取宠物数据

  2. Reader 它将获得属于一个人的宠物列表。注意:Reader 将 return 仅针对某个人的宠物列表。

  3. 处理器 基于宠物人,您将根据您的要求进行处理。

  4. 作家 根据您写入数据库的要求。

低级代码片段:

  1. Partitioner

    public class PetPersonPartitioner implements Partitioner {
    
      @Autowired
      private PersonDAO personDAO;
    
      @Override
      public Map<String, ExecutionContext> partition(int gridSize) {
    
        Map<String, ExecutionContext> queue = new HashMap<String, ExecutionContext>();
    
        List<Person> personList = this.personDAO.getAllPersons();
        for (Person person : personList) {
    
          ExecutionContext ec = new ExecutionContext();
          ec.put("person", person);
          ec.put("personId", person.getId());
    
          queue.put(person.getId(), ec);
        }
    
      return queue;
      }
    }
    
  2. Reader

    <bean id="petByPersonIdRowMapper" class="yourpackage.PetByPersonIdRowMapper" />
    
    <bean id="petByPesonIdStatementSetter" scope="step"
          class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
        <property name="parameters">
            <list>
                <value>#{stepExecutionContext['personId']}</value>
            </list>
        </property>
    </bean>
    
public class PetByPersonIdRowMapper implements RowMapper<PersonPetDetails> {
    @Override
    public BillingFeeConfigEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
        PersonPetDetails record = new PersonPetDetails();

        record.setPersonId(rs.getLong("personId"));
      record.setPetId(rs.getLong("petid");
      ...
      ...
}
  1. Processor 您可以继续处理每个 PersonPetDetails 对象。