将事务传播到 Forkjoin 提交

Propagate transaction to Forkjoin submit

我正在创建一个具有多个线程的 ForkJoinPool 来并行执行一个流,它是从 jpa 中的查询执行的,但是我在将事务传播到 ForkJoinPool 的方法提交时遇到了问题。

@Transactional(readOnly = true)
public void streamTest() {
    ForkJoinPool customThreadPool = new ForkJoinPool(20);
    try {
    customThreadPool.submit(() ->
         priceRepository.streamAll()
         .parallel()
         .map(p -> this.transform(p))
         .forEach(System.out::println)
         ).get();
    } catch (InterruptedException | ExecutionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

我收到错误:"You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."

如果我取下 ForkJoinPool 来执行流,它工作正常。 如何将事务(只读)传播到从 ForkJoinPool 提交的方法的执行,有什么办法吗?

我发现了如何在 ForkJoinPool 的任务中设置事务。 我只需像下面那样使用 TransactionSynchronizationManager。

@Transactional(readOnly = true)
public void streamTest() {
ForkJoinPool customThreadPool = new ForkJoinPool(20);
try {
customThreadPool.submit(() -> {
    TransactionSynchronizationManager.setActualTransactionActive(true);
    TransactionSynchronizationManager.setCurrentTransactionReadOnly(true);
    TransactionSynchronizationManager.initSynchronization();
     priceRepository.streamAll()
     .parallel()
     .map(p -> this.transform(p))
     .forEach(System.out::println);
     }).get();
} catch (InterruptedException | ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

}