为什么 Flux.flatMap() 不等待内部发布者完成?

Why Flux.flatMap() doesn't wait for completion of inner publisher?

您能否解释一下 HttpClient.response() 返回的 Flux/Mono 到底发生了什么?我认为由 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8 异常结束。如果我用 Mono.fromCallable { }.

替换对 testRequest() 的调用,它会按预期工作(项目正在一个接一个地处理)

我错过了什么?

测试代码:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext { (obj, res) ->
                    println("Created request for: $obj ${res.length} characters")
                }
                .collectList().block()!!
    }

    fun testRequest(): Mono<String> {
        return client.get()
                .uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
                .responseContent()
                .reduce(StringBuilder(), { sb, buf ->
                    val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
                    sb.append(str)
                })
                .map { it.toString() }
    }
}

当您像这样创建 ConnectionProviderConnectionProvider.create("meh", 4),这意味着连接池的最大连接数为 4,最大待处理请求数为 8。请参阅 here 了解更多信息。

当您使用 flatMap 时,这意味着 Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave 请参阅 here 更多相关信息。

那么发生的事情是您试图同时 运行 所有请求。

所以你有两个选择:

  • 如果您想使用 flatMap,请增加待处理请求的数量。
  • 如果您想保留待处理请求的数量,您可以考虑使用 concatMap 而不是 flatMap,这意味着 Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation。查看更多 here 相关信息。