R2DBC Statement.fetchsize exaclty 是如何工作的

How R2DBC Statement.fetchsize exaclty works

我正在使用 r2dbc-postgresql 驱动程序。假设我们有一个包含 1000 条记录的 table 而 fetchSize 是 100:

connectionMono.flatMapMany(
                connection -> connection
                        .createStatement("select age from users")
                        .fetchSize(100)
                        .execute())

将执行多少个网络调用?我知道使用 JDBC Statement.SetFetchsize,驱动程序将分 10 批获取所有行,每批 100 行。

查看 r2dbc 驱动程序中的代码,行为是相同的:它按指定大小的块获取行,因此在您的情况下为 100。

这是ExtendedQueryMessageFlow中处理方法的代码:

    /**
     * Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
     *
     * @param bindFlow  the initial bind flow
     * @param client    client to use
     * @param portal    the portal
     * @param fetchSize fetch size per roundtrip
     * @return the resulting message stream
     */
    private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {

        DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
        FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);

        return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
            .handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
                if (message instanceof CommandComplete) {
                    requestsSink.next(new Close(portal, PORTAL));
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof ErrorResponse) {
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof PortalSuspended) {
                    if (isCanceled.get()) {
                        requestsSink.next(new Close(portal, PORTAL));
                        requestsSink.next(Sync.INSTANCE);
                        requestsSink.complete();
                    } else {
                        requestsSink.next(new Execute(portal, fetchSize));
                        requestsSink.next(Flush.INSTANCE);
                    }
                } else {
                    sink.next(message);
                }
            })
            .as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }