正在处理 table 中的所有行作为 R2DB 中的 Flux 阻塞或锁定?
Is processing all the rows from a table as Flux blocking or locking in R2DB?
以下构造是否以任何可能的方式阻塞或锁定?这是使用 R2DB 的正确方法吗?如果不是,如何以反应方式处理 table 的所有记录?
我担心的是在整个通量被消耗之前数据库连接、table 和 Reactor 线程发生了什么。如果我阻塞了线程,或者如果我保持 table 锁定,或者数据库连接被阻塞。
目的是编写对table中所有行的批处理。对于每一行,我想执行一个 activity,其中包括从外部 Web 服务获取数据,最后在完全相同的 table.
中覆盖原始发票
假设单行处理方法(在此示例中 recalculateInvoice()
)是完全反应式的。
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import lombok.RequiredArgsConstructor;
@Repository
@RequiredArgsConstructor
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Void> batchProcessing() {
return databaseClient
.sql("SELECT * FROM invoice")
.fetch()
.all()
.delayElements(Duration.ofSeconds(10))
// a costly operation with the data ...
.then()
;
}
}
您的代码片段是完全响应式的。它不会阻塞任何线程或数据库。这就是使用R2DBC的全部思路。
在幕后,R2DBC 驱动程序将执行如下操作:
Flux.usingWhen(connectionFactory.create(),
connection ->
Flux.from(connection.createStatement("SELECT * FROM invoice").execute()),
Connection::close);
一旦管道完成或发生错误,数据库连接将关闭。
您可以阅读 here 了解如何对此类查询应用背压。
以下构造是否以任何可能的方式阻塞或锁定?这是使用 R2DB 的正确方法吗?如果不是,如何以反应方式处理 table 的所有记录?
我担心的是在整个通量被消耗之前数据库连接、table 和 Reactor 线程发生了什么。如果我阻塞了线程,或者如果我保持 table 锁定,或者数据库连接被阻塞。
目的是编写对table中所有行的批处理。对于每一行,我想执行一个 activity,其中包括从外部 Web 服务获取数据,最后在完全相同的 table.
中覆盖原始发票假设单行处理方法(在此示例中 recalculateInvoice()
)是完全反应式的。
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import lombok.RequiredArgsConstructor;
@Repository
@RequiredArgsConstructor
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Void> batchProcessing() {
return databaseClient
.sql("SELECT * FROM invoice")
.fetch()
.all()
.delayElements(Duration.ofSeconds(10))
// a costly operation with the data ...
.then()
;
}
}
您的代码片段是完全响应式的。它不会阻塞任何线程或数据库。这就是使用R2DBC的全部思路。
在幕后,R2DBC 驱动程序将执行如下操作:
Flux.usingWhen(connectionFactory.create(),
connection ->
Flux.from(connection.createStatement("SELECT * FROM invoice").execute()),
Connection::close);
一旦管道完成或发生错误,数据库连接将关闭。
您可以阅读 here 了解如何对此类查询应用背压。