Jpa reader Spring 批处理

Jpa reader Spring Batch

我想知道,是否推荐使用这种方式来使用 jpa 实现 reader spring 批处理,还是寻找其他解决方案更好,如果不推荐这种方式,哪里可以我正在寻找有关更好选择的信息

public class CreditCardItemReader implements ItemReader<CreditCard> {

@Autowired
private CreditCardRepository respository;

private Iterator<CreditCard> usersIterator;

@BeforeStep
public void before(StepExecution stepExecution) {
    usersIterator = respository.someQuery().iterator();
}

@Override
public CreditCard read() {
    if (usersIterator != null && usersIterator.hasNext()) {
        return usersIterator.next();
    } else {
        return null;
    }
}
  }

此实现仅适用于小型数据集,因为数据由一批查询读取,并将整个结果列表存储在内存中。另外,它不是 thread-safe。
加载大卷的情况下:

  • 在内存有限的环境下会导致内存不足
  • 可能会导致性能问题。我们将等到通过一次调用从数据库中加载数千条记录


解决方案一,org.springframework.batch.item.database.JpaCursorItemReader
在 Spring 批处理中开箱即用地定义了一个类似的实现:JpaCursorItemReader
主要区别在于此实现仅使用特定的 JPQL 查询而不是存储库,并使用 JPA 的 Query.getResultStream() 方法来获取查询结果。
实施JpaCursorItemReader

    protected void doOpen() throws Exception {
        ...
        Query query = createQuery();
        if (this.parameterValues != null) {
            this.parameterValues.forEach(query::setParameter);
        }
        this.iterator = query.getResultStream().iterator();
    }
例如

Hibernate,在5.2版本中引入了Query.getResultStream()方法。 它使用 Hibernate 的 ScrollableResult 实现来遍历结果集并批量获取记录。这可以防止您一次加载结果集中的所有记录并允许您更有效地处理它们。
创建示例:

    protected ItemReader<Foo> getItemReader() throws Exception {
        LocalContainerEntityManagerFactoryBean factoryBean = new LocalContainerEntityManagerFactoryBean();
        String jpqlQuery = "from Foo";
        JpaCursorItemReader<Foo> itemReader = new JpaCursorItemReader<>();
        itemReader.setQueryString(jpqlQuery);
        itemReader.setEntityManagerFactory(factoryBean.getObject());
        itemReader.afterPropertiesSet();
        itemReader.setSaveState(true);
        return itemReader;
    }

方案二,org.springframework.batch.item.database.JpaPagingItemReader
它是比JpaCursorItemReader更灵活的JPQL查询解决方案。 ItemReader按页加载和存储数据thread-safe。
根据文档:

ItemReader for reading database records built on top of JPA.

It executes the JPQL setQueryString(String) to retrieve requested data. The query is executed using paged requests of a size specified in AbstractPagingItemReader.setPageSize(int). Additional pages are requested when needed as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to current position.

The performance of the paging depends on the JPA implementation and its use of database specific features to limit the number of returned rows.

Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.

In order to reduce the memory usage for large results the persistence context is flushed and cleared after each page is read. This causes any entities read to be detached. If you make changes to the entities and want the changes persisted then you must explicitly merge the entities.

The implementation is thread-safe in between calls

方案三,org.springframework.batch.item.data.RepositoryItemReader
这是一个更有效的解决方案。它与存储库一起工作,以块的形式加载和存储数据,它是 thread-safe。
根据文档:

A ItemReader that reads records utilizing a PagingAndSortingRepository.

Performance of the reader is dependent on the repository implementation, however setting a reasonably large page size and matching that to the commit interval should yield better performance.

The reader must be configured with a PagingAndSortingRepository, a Sort, and a pageSize greater than 0.

This implementation is thread-safe between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available).

创建示例:

PagingAndSortingRepository<Foo, Long> repository = FooRepository<>();
RepositoryItemReader<Foo> reader = new RepositoryItemReader<>();
reader.setRepository(repository ); //The PagingAndSortingRepository implementation used to read input from.
reader.setMethodName("findByName"); //Specifies what method on the repository to call.
reader.setArguments(arguments); // Arguments to be passed to the data providing method.

通过构建器创建:

PagingAndSortingRepository<Foo, Long> repository = new FooRepository<>();
new RepositoryItemReaderBuilder<>().repository(repository)
                                   .methodName("findByName")
                                   .arguments(new ArrayList<>())
                                   .build()

更多用法示例:RepositoryItemReaderTests and RepositoryItemReaderIntegrationTests

总结:
您的实施仅适用于简单的用例。
我建议使用开箱即用的解决方案。