为什么 Flux.flatMap() 不等待内部发布者完成?
Why Flux.flatMap() doesn't wait for completion of inner publisher?
您能否解释一下 HttpClient.response()
返回的 Flux/Mono 到底发生了什么?我认为由 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8
异常结束。如果我用 Mono.fromCallable { }
.
替换对 testRequest()
的调用,它会按预期工作(项目正在一个接一个地处理)
我错过了什么?
测试代码:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}
当您像这样创建 ConnectionProvider
时 ConnectionProvider.create("meh", 4)
,这意味着连接池的最大连接数为 4,最大待处理请求数为 8。请参阅 here 了解更多信息。
当您使用 flatMap
时,这意味着 Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave
请参阅 here 更多相关信息。
那么发生的事情是您试图同时 运行 所有请求。
所以你有两个选择:
- 如果您想使用
flatMap
,请增加待处理请求的数量。
- 如果您想保留待处理请求的数量,您可以考虑使用
concatMap
而不是 flatMap
,这意味着 Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation
。查看更多 here 相关信息。
您能否解释一下 HttpClient.response()
返回的 Flux/Mono 到底发生了什么?我认为由 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8
异常结束。如果我用 Mono.fromCallable { }
.
testRequest()
的调用,它会按预期工作(项目正在一个接一个地处理)
我错过了什么?
测试代码:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}
当您像这样创建 ConnectionProvider
时 ConnectionProvider.create("meh", 4)
,这意味着连接池的最大连接数为 4,最大待处理请求数为 8。请参阅 here 了解更多信息。
当您使用 flatMap
时,这意味着 Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave
请参阅 here 更多相关信息。
那么发生的事情是您试图同时 运行 所有请求。
所以你有两个选择:
- 如果您想使用
flatMap
,请增加待处理请求的数量。 - 如果您想保留待处理请求的数量,您可以考虑使用
concatMap
而不是flatMap
,这意味着Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation
。查看更多 here 相关信息。