Reactors Parallel Flux + Timeouts 的背压
Backpressure with Reactors Parallel Flux + Timeouts
我目前正在研究在 Flux 中使用并行性。现在我遇到了背压问题。在我们的例子中,我们有一个我们想要消费的快速生产服务,但我们要慢得多。
对于正常的通量,到目前为止这是可行的,但我们希望具有并行性。当我使用
的方法时我看到了什么
.parallel(2)
.runOn(Schedulers.parallel())
一开始有一个很大的请求,需要很长时间才能处理。这里也出现了一个不同的问题,如果我们处理时间太长,我们似乎会在生产者服务中生成一个取消事件(我们通过 webflux rest-call 使用它),但在消费者中看不到取消事件。
但回到问题 1,如何才能使这件事恢复同步。我知道 .parallel()
方法中的 prefetch 参数,但它没有像我预期的那样工作。
一个最小的例子是这样的
fun main() {
val atomicInteger = AtomicInteger(0)
val receivedCount = AtomicInteger(0)
val processedCount = AtomicInteger(0)
Flux.generate<Int> {
it.next(atomicInteger.getAndIncrement())
println("Emitted ${atomicInteger.get()}")
}.doOnEach { it.get()?.let { receivedCount.addAndGet(1) } }
.parallel(2, 1)
.runOn(Schedulers.parallel())
.flatMap {
Thread.sleep(200)
log("Work on $it")
processedCount.addAndGet(1)
Mono.just(it * 2)
}.subscribe {
log("Received ${receivedCount.get()} and processed ${processedCount.get()}")
}
Thread.sleep(25000)
}
在哪里可以观察到这样的日志
...
Emitted 509
Emitted 510
Emitted 511
Emitted 512
Emitted 513
2022-02-02T14:12:58.164465Z - Thread[parallel-1,5,main] Work on 0
2022-02-02T14:12:58.168469Z - Thread[parallel-2,5,main] Work on 1
2022-02-02T14:12:58.241966Z - Thread[parallel-1,5,main] Received 513 and processed 2
2022-02-02T14:12:58.241980Z - Thread[parallel-2,5,main] Received 513 and processed 2
2022-02-02T14:12:58.442218Z - Thread[parallel-2,5,main] Work on 3
2022-02-02T14:12:58.442215Z - Thread[parallel-1,5,main] Work on 2
2022-02-02T14:12:58.442315Z - Thread[parallel-2,5,main] Received 513 and processed 3
2022-02-02T14:12:58.442338Z - Thread[parallel-1,5,main] Received 513 and processed 4
那么我该如何调整我可以使用并行性但与我的制作人保持 backpressure/sync 的东西?我让它工作的唯一方法是在 parallelFlux 之前获取信号量并在工作后释放,但这并不是一个很好的解决方案。
好的,对于这个 szenario,parallel 和 runOn 的预取必须下注非常低,这里是 1,这似乎很重要。
使用 256 的默认值,我们向我们的生产者请求了太多,因此已经有一个取消事件,因为在第一个获取预取的请求块和下一个 Flux 决定再次填充缓冲区的请求块之间的时间很长。
我目前正在研究在 Flux 中使用并行性。现在我遇到了背压问题。在我们的例子中,我们有一个我们想要消费的快速生产服务,但我们要慢得多。 对于正常的通量,到目前为止这是可行的,但我们希望具有并行性。当我使用
的方法时我看到了什么.parallel(2)
.runOn(Schedulers.parallel())
一开始有一个很大的请求,需要很长时间才能处理。这里也出现了一个不同的问题,如果我们处理时间太长,我们似乎会在生产者服务中生成一个取消事件(我们通过 webflux rest-call 使用它),但在消费者中看不到取消事件。
但回到问题 1,如何才能使这件事恢复同步。我知道 .parallel()
方法中的 prefetch 参数,但它没有像我预期的那样工作。
一个最小的例子是这样的
fun main() {
val atomicInteger = AtomicInteger(0)
val receivedCount = AtomicInteger(0)
val processedCount = AtomicInteger(0)
Flux.generate<Int> {
it.next(atomicInteger.getAndIncrement())
println("Emitted ${atomicInteger.get()}")
}.doOnEach { it.get()?.let { receivedCount.addAndGet(1) } }
.parallel(2, 1)
.runOn(Schedulers.parallel())
.flatMap {
Thread.sleep(200)
log("Work on $it")
processedCount.addAndGet(1)
Mono.just(it * 2)
}.subscribe {
log("Received ${receivedCount.get()} and processed ${processedCount.get()}")
}
Thread.sleep(25000)
}
在哪里可以观察到这样的日志
...
Emitted 509
Emitted 510
Emitted 511
Emitted 512
Emitted 513
2022-02-02T14:12:58.164465Z - Thread[parallel-1,5,main] Work on 0
2022-02-02T14:12:58.168469Z - Thread[parallel-2,5,main] Work on 1
2022-02-02T14:12:58.241966Z - Thread[parallel-1,5,main] Received 513 and processed 2
2022-02-02T14:12:58.241980Z - Thread[parallel-2,5,main] Received 513 and processed 2
2022-02-02T14:12:58.442218Z - Thread[parallel-2,5,main] Work on 3
2022-02-02T14:12:58.442215Z - Thread[parallel-1,5,main] Work on 2
2022-02-02T14:12:58.442315Z - Thread[parallel-2,5,main] Received 513 and processed 3
2022-02-02T14:12:58.442338Z - Thread[parallel-1,5,main] Received 513 and processed 4
那么我该如何调整我可以使用并行性但与我的制作人保持 backpressure/sync 的东西?我让它工作的唯一方法是在 parallelFlux 之前获取信号量并在工作后释放,但这并不是一个很好的解决方案。
好的,对于这个 szenario,parallel 和 runOn 的预取必须下注非常低,这里是 1,这似乎很重要。 使用 256 的默认值,我们向我们的生产者请求了太多,因此已经有一个取消事件,因为在第一个获取预取的请求块和下一个 Flux 决定再次填充缓冲区的请求块之间的时间很长。