R2DBC Statement.fetchsize exaclty 是如何工作的
How R2DBC Statement.fetchsize exaclty works
我正在使用 r2dbc-postgresql 驱动程序。假设我们有一个包含 1000 条记录的 table 而 fetchSize
是 100:
connectionMono.flatMapMany(
connection -> connection
.createStatement("select age from users")
.fetchSize(100)
.execute())
将执行多少个网络调用?我知道使用 JDBC Statement.SetFetchsize
,驱动程序将分 10 批获取所有行,每批 100 行。
查看 r2dbc 驱动程序中的代码,行为是相同的:它按指定大小的块获取行,因此在您的情况下为 100。
这是ExtendedQueryMessageFlow中处理方法的代码:
/**
* Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
*
* @param bindFlow the initial bind flow
* @param client client to use
* @param portal the portal
* @param fetchSize fetch size per roundtrip
* @return the resulting message stream
*/
private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
AtomicBoolean isCanceled = new AtomicBoolean(false);
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
.handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
if (message instanceof CommandComplete) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof ErrorResponse) {
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof PortalSuspended) {
if (isCanceled.get()) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
} else {
requestsSink.next(new Execute(portal, fetchSize));
requestsSink.next(Flush.INSTANCE);
}
} else {
sink.next(message);
}
})
.as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
}
我正在使用 r2dbc-postgresql 驱动程序。假设我们有一个包含 1000 条记录的 table 而 fetchSize
是 100:
connectionMono.flatMapMany(
connection -> connection
.createStatement("select age from users")
.fetchSize(100)
.execute())
将执行多少个网络调用?我知道使用 JDBC Statement.SetFetchsize
,驱动程序将分 10 批获取所有行,每批 100 行。
查看 r2dbc 驱动程序中的代码,行为是相同的:它按指定大小的块获取行,因此在您的情况下为 100。
这是ExtendedQueryMessageFlow中处理方法的代码:
/**
* Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
*
* @param bindFlow the initial bind flow
* @param client client to use
* @param portal the portal
* @param fetchSize fetch size per roundtrip
* @return the resulting message stream
*/
private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
AtomicBoolean isCanceled = new AtomicBoolean(false);
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
.handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
if (message instanceof CommandComplete) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof ErrorResponse) {
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof PortalSuspended) {
if (isCanceled.get()) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
} else {
requestsSink.next(new Execute(portal, fetchSize));
requestsSink.next(Flush.INSTANCE);
}
} else {
sink.next(message);
}
})
.as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
}