正在处理 table 中的所有行作为 R2DB 中的 Flux 阻塞或锁定?

Is processing all the rows from a table as Flux blocking or locking in R2DB?

以下构造是否以任何可能的方式阻塞或锁定?这是使用 R2DB 的正确方法吗?如果不是,如何以反应方式处理 table 的所有记录?

我担心的是在整个通量被消耗之前数据库连接、table 和 Reactor 线程发生了什么。如果我阻塞了线程,或者如果我保持 table 锁定,或者数据库连接被阻塞。

目的是编写对table中所有行的批处理。对于每一行,我想执行一个 activity,其中包括从外部 Web 服务获取数据,最后在完全相同的 table.

中覆盖原始发票

假设单行处理方法(在此示例中 recalculateInvoice())是完全反应式的。

import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import lombok.RequiredArgsConstructor;

@Repository
@RequiredArgsConstructor
public class DbConnector {
    private final DatabaseClient databaseClient;

    public Mono<Void> batchProcessing() {
        return databaseClient
                .sql("SELECT * FROM invoice")
                .fetch()
                .all()
                .delayElements(Duration.ofSeconds(10)) 
                // a costly operation with the data ...
                .then()
        ;
    }
}

您的代码片段是完全响应式的。它不会阻塞任何线程或数据库。这就是使用R2DBC的全部思路。

在幕后,R2DBC 驱动程序将执行如下操作:

Flux.usingWhen(connectionFactory.create(),
        connection -> 
            Flux.from(connection.createStatement("SELECT * FROM invoice").execute()),
        Connection::close);

一旦管道完成或发生错误,数据库连接将关闭。

您可以阅读 here 了解如何对此类查询应用背压。