Reactors Parallel Flux + Timeouts 的背压

Backpressure with Reactors Parallel Flux + Timeouts

我目前正在研究在 Flux 中使用并行性。现在我遇到了背压问题。在我们的例子中,我们有一个我们想要消费的快速生产服务,但我们要慢得多。 对于正常的通量,到目前为止这是可行的,但我们希望具有并行性。当我使用

的方法时我看到了什么
.parallel(2)
.runOn(Schedulers.parallel())

一开始有一个很大的请求,需要很长时间才能处理。这里也出现了一个不同的问题,如果我们处理时间太长,我们似乎会在生产者服务中生成一个取消事件(我们通过 webflux rest-call 使用它),但在消费者中看不到取消事件。

但回到问题 1,如何才能使这件事恢复同步。我知道 .parallel() 方法中的 prefetch 参数,但它没有像我预期的那样工作。

一个最小的例子是这样的

fun main() {
    val atomicInteger = AtomicInteger(0)
    val receivedCount = AtomicInteger(0)
    val processedCount = AtomicInteger(0)
    Flux.generate<Int> {
        it.next(atomicInteger.getAndIncrement())
        println("Emitted ${atomicInteger.get()}")
    }.doOnEach { it.get()?.let { receivedCount.addAndGet(1) } }
        .parallel(2, 1)
        .runOn(Schedulers.parallel())
        .flatMap {
            Thread.sleep(200)
            log("Work on $it")
            processedCount.addAndGet(1)
            Mono.just(it * 2)
        }.subscribe {
            log("Received ${receivedCount.get()} and processed ${processedCount.get()}")
        }

    Thread.sleep(25000)
}

在哪里可以观察到这样的日志

...
Emitted 509
Emitted 510
Emitted 511
Emitted 512
Emitted 513
2022-02-02T14:12:58.164465Z - Thread[parallel-1,5,main] Work on 0
2022-02-02T14:12:58.168469Z - Thread[parallel-2,5,main] Work on 1
2022-02-02T14:12:58.241966Z - Thread[parallel-1,5,main] Received 513 and processed 2
2022-02-02T14:12:58.241980Z - Thread[parallel-2,5,main] Received 513 and processed 2
2022-02-02T14:12:58.442218Z - Thread[parallel-2,5,main] Work on 3
2022-02-02T14:12:58.442215Z - Thread[parallel-1,5,main] Work on 2
2022-02-02T14:12:58.442315Z - Thread[parallel-2,5,main] Received 513 and processed 3
2022-02-02T14:12:58.442338Z - Thread[parallel-1,5,main] Received 513 and processed 4

那么我该如何调整我可以使用并行性但与我的制作人保持 backpressure/sync 的东西?我让它工作的唯一方法是在 parallelFlux 之前获取信号量并在工作后释放,但这并不是一个很好的解决方案。

好的,对于这个 szenario,parallel 和 runOn 的预取必须下注非常低,这里是 1,这似乎很重要。 使用 256 的默认值,我们向我们的生产者请求了太多,因此已经有一个取消事件,因为在第一个获取预取的请求块和下一个 Flux 决定再次填充缓冲区的请求块之间的时间很长。