这是 Reactor 的 Mono 中的错误吗?
Is this a bug in Reactor's Mono?
我在使用 Reactor 时注意到一些奇怪的行为。场景是这样的:
- 调用 rest API 端点,获取值,包装在 Mono
- 使用上述值调用另一个 rest API 端点,检索另一个值,包装在 Mono
- 压缩两个结果
似乎发生的是 onSubscribe(FluxMap.MapSubscriber) 在第一个 API 调用中被调用两次,然后打开两个连接并产生两个结果。传递给第二个 API 调用的结果是不确定的,取决于第二个 API 调用是在前两个调用中的第二个调用完成之前还是之后执行。
这是使用 Kotlin 和 Springboot WebClient 重现问题的代码示例。 API 终结点生成单个 GUID 或多个 GUID,具体取决于路径参数。我使用第一次调用结果中的第一个数字作为第二次调用中的路径参数:
val api = "https://www.uuidgenerator.net/api/guid"
val client = WebClient.builder()
.baseUrl(api)
.build()
@Test
public fun reactorBug() {
val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") }
val secondResult = callApi(firstResult).doOnSuccess { r -> println("callApi(result) returned: $r") }
println(Mono.zip(firstResult, secondResult, { first, second -> "First result is ${first}Second result is $second" }).block())
}
private fun callApi(): Mono<String> {
println("Calling Api")
return client.get().retrieve().bodyToMono()
}
private fun callApi(number: Int): Mono<String> {
println("Calling Api with $number")
return client.get().uri("/{number}", number).retrieve().bodyToMono()
}
private fun callApi(firstResult: Mono<String>): Mono<String> {
println("Extracting number from first result")
return firstResult
.map { guid -> guid.find { c -> c.isDigit() } }
.map { Character.getNumericValue(it!!) }
.flatMap { i -> callApi(i) }
}
这是说明问题的示例输出:
Calling Api
Extracting number from first result
callApi returned: 12ec857b-e42c-42ab-a7a2-69beb9a377e3
callApi returned: 5eedefa5-73b5-4995-aef3-8621e31b698d <- this result shouldn't happen
Calling Api with 5 <- this should be 1, not 5
callApi(result) returned:
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594c-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0
First result is 12ec857b-e42c-42ab-a7a2-69beb9a377e3
Second result is
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594c-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0
编辑调试输出:
30-01-2018 22:36:11.889 [主要] 调试 o.s.web.reactive.function.client.debug - onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:11.920 [主要] 调试 o.s.web.reactive.function.client.debug - 请求(无界)
30-01-2018 22:36:11.924 [主要] 调试 io.netty.util.NetUtil.debug - -Djava.net.preferIPv4Stack: false
30-01-2018 22:36:11.925 [主要] 调试 io.netty.util.NetUtil.debug - -Djava.net.preferIPv6Addresses: false
30-01-2018 22:36:12.128 [main] DEBUG io.netty.util.NetUtil.debug - 环回接口:lo(软件环回接口 1、127.0.0.1)
30-01-2018 22:36:12.129 [main] DEBUG io.netty.util.NetUtil.debug - 无法从 sysctl 和文件 \proc\sys\net\core\somaxconn 获取 SOMAXCONN。默认值:200
30-01-2018 22:36:12.146 [main] DEBUG r.i.n.r.DefaultLoopEpollDetector.debug - 默认 epoll 支持:false
30-01-2018 22:36:12.156 [main] DEBUG r.i.n.resources.DefaultPoolResources.debug - www.uuidgenerator.net/173.255.225.224:443
的新 http 客户端池
30-01-2018 22:36:12.190 [main] DEBUG io.netty.channel.DefaultChannelId.debug - -Dio.netty.processId:4232(自动检测)
30-01-2018 22:36:12.396 [主要] 调试 io.netty.channel.DefaultChannelId.debug - -Dio.netty.machineId:78:e4:00:ff:fe:bf:a5:cb(自动检测)
30-01-2018 22:36:12.447 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.allocator.type:合并
30-01-2018 22:36:12.448 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.threadLocalDirectBufferSize: 65536
30-01-2018 22:36:12.448 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.maxThreadLocalCharBufferSize: 16384
30-01-2018 22:36:12.459 [main] DEBUG r.i.n.c.PooledClientContextHandler.debug - 从池中获取现有通道:DefaultPromise@d23e4a(不完整)SimpleChannelPool{activeConnections=1}
30-01-2018 22:36:12.461 [主要] 调试 o.s.web.reactive.function.client.debug - onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:12.462 [主要] 调试 o.s.web.reactive.function.client.debug - 请求(无界)
30-01-2018 22:36:12.463 [main] DEBUG r.i.n.c.PooledClientContextHandler.debug - 从池中获取现有通道:DefaultPromise@c8295b(不完整)SimpleChannelPool{activeConnections=1}
30-01-2018 22:36:12.520 [reactor-http-nio-2] 调试 r.i.n.resources.DefaultPoolResources.debug - 已创建 [id: 0x88225196],现在有 2 个活动连接
30-01-2018 22:36:12.520 [reactor-http-nio-4] 调试 r.i.n.resources.DefaultPoolResources.debug - 已创建 [id: 0x80971ff0],现在有 2 个活动连接
为什么第一个 API 调用发生两次 - 这是一个错误还是这是 Mono 的预期行为?
Why does the first api call happen twice
zip
将订阅 firstResult
两次,一次是直接订阅,一次是通过 map-map-flatMap
链
在这种情况下您不需要 zip
,只需 flatMap
一遍又一遍:
val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") }
val lastResult = firstResult
.flatMap { first ->
Mono.just(first)
.map { guid -> guid.find { c -> c.isDigit() } }
.map { Character.getNumericValue(it!!) }
.flatMap { i -> callApi(i) }
.map { second -> "First result is ${first}Second result is $second" }
}
lastResult.block()
我在使用 Reactor 时注意到一些奇怪的行为。场景是这样的:
- 调用 rest API 端点,获取值,包装在 Mono
- 使用上述值调用另一个 rest API 端点,检索另一个值,包装在 Mono
- 压缩两个结果
似乎发生的是 onSubscribe(FluxMap.MapSubscriber) 在第一个 API 调用中被调用两次,然后打开两个连接并产生两个结果。传递给第二个 API 调用的结果是不确定的,取决于第二个 API 调用是在前两个调用中的第二个调用完成之前还是之后执行。
这是使用 Kotlin 和 Springboot WebClient 重现问题的代码示例。 API 终结点生成单个 GUID 或多个 GUID,具体取决于路径参数。我使用第一次调用结果中的第一个数字作为第二次调用中的路径参数:
val api = "https://www.uuidgenerator.net/api/guid"
val client = WebClient.builder()
.baseUrl(api)
.build()
@Test
public fun reactorBug() {
val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") }
val secondResult = callApi(firstResult).doOnSuccess { r -> println("callApi(result) returned: $r") }
println(Mono.zip(firstResult, secondResult, { first, second -> "First result is ${first}Second result is $second" }).block())
}
private fun callApi(): Mono<String> {
println("Calling Api")
return client.get().retrieve().bodyToMono()
}
private fun callApi(number: Int): Mono<String> {
println("Calling Api with $number")
return client.get().uri("/{number}", number).retrieve().bodyToMono()
}
private fun callApi(firstResult: Mono<String>): Mono<String> {
println("Extracting number from first result")
return firstResult
.map { guid -> guid.find { c -> c.isDigit() } }
.map { Character.getNumericValue(it!!) }
.flatMap { i -> callApi(i) }
}
这是说明问题的示例输出:
Calling Api
Extracting number from first result
callApi returned: 12ec857b-e42c-42ab-a7a2-69beb9a377e3
callApi returned: 5eedefa5-73b5-4995-aef3-8621e31b698d <- this result shouldn't happen
Calling Api with 5 <- this should be 1, not 5
callApi(result) returned:
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594c-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0
First result is 12ec857b-e42c-42ab-a7a2-69beb9a377e3
Second result is
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594c-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0
编辑调试输出:
30-01-2018 22:36:11.889 [主要] 调试 o.s.web.reactive.function.client.debug - onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:11.920 [主要] 调试 o.s.web.reactive.function.client.debug - 请求(无界)
30-01-2018 22:36:11.924 [主要] 调试 io.netty.util.NetUtil.debug - -Djava.net.preferIPv4Stack: false
30-01-2018 22:36:11.925 [主要] 调试 io.netty.util.NetUtil.debug - -Djava.net.preferIPv6Addresses: false
30-01-2018 22:36:12.128 [main] DEBUG io.netty.util.NetUtil.debug - 环回接口:lo(软件环回接口 1、127.0.0.1)
30-01-2018 22:36:12.129 [main] DEBUG io.netty.util.NetUtil.debug - 无法从 sysctl 和文件 \proc\sys\net\core\somaxconn 获取 SOMAXCONN。默认值:200
30-01-2018 22:36:12.146 [main] DEBUG r.i.n.r.DefaultLoopEpollDetector.debug - 默认 epoll 支持:false
30-01-2018 22:36:12.156 [main] DEBUG r.i.n.resources.DefaultPoolResources.debug - www.uuidgenerator.net/173.255.225.224:443
的新 http 客户端池
30-01-2018 22:36:12.190 [main] DEBUG io.netty.channel.DefaultChannelId.debug - -Dio.netty.processId:4232(自动检测)
30-01-2018 22:36:12.396 [主要] 调试 io.netty.channel.DefaultChannelId.debug - -Dio.netty.machineId:78:e4:00:ff:fe:bf:a5:cb(自动检测)
30-01-2018 22:36:12.447 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.allocator.type:合并
30-01-2018 22:36:12.448 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.threadLocalDirectBufferSize: 65536
30-01-2018 22:36:12.448 [主要] 调试 io.netty.buffer.ByteBufUtil.debug - -Dio.netty.maxThreadLocalCharBufferSize: 16384
30-01-2018 22:36:12.459 [main] DEBUG r.i.n.c.PooledClientContextHandler.debug - 从池中获取现有通道:DefaultPromise@d23e4a(不完整)SimpleChannelPool{activeConnections=1}
30-01-2018 22:36:12.461 [主要] 调试 o.s.web.reactive.function.client.debug - onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:12.462 [主要] 调试 o.s.web.reactive.function.client.debug - 请求(无界)
30-01-2018 22:36:12.463 [main] DEBUG r.i.n.c.PooledClientContextHandler.debug - 从池中获取现有通道:DefaultPromise@c8295b(不完整)SimpleChannelPool{activeConnections=1}
30-01-2018 22:36:12.520 [reactor-http-nio-2] 调试 r.i.n.resources.DefaultPoolResources.debug - 已创建 [id: 0x88225196],现在有 2 个活动连接
30-01-2018 22:36:12.520 [reactor-http-nio-4] 调试 r.i.n.resources.DefaultPoolResources.debug - 已创建 [id: 0x80971ff0],现在有 2 个活动连接
为什么第一个 API 调用发生两次 - 这是一个错误还是这是 Mono 的预期行为?
Why does the first api call happen twice
zip
将订阅 firstResult
两次,一次是直接订阅,一次是通过 map-map-flatMap
链
在这种情况下您不需要 zip
,只需 flatMap
一遍又一遍:
val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") }
val lastResult = firstResult
.flatMap { first ->
Mono.just(first)
.map { guid -> guid.find { c -> c.isDigit() } }
.map { Character.getNumericValue(it!!) }
.flatMap { i -> callApi(i) }
.map { second -> "First result is ${first}Second result is $second" }
}
lastResult.block()