MissingBackpressureException:由于缺少请求而无法发出缓冲区

MissingBackpressureException: Could not emit buffer due to lack of requests

我收到了一份错误报告,其中包括 MissingBackpressureException: Could not emit buffer due to lack of requests 用于 RxJava Flowable,但我正在努力创建一个简单的测试用例来演示问题(维护 Flowable).

这是我试图放在一起的测试,它在管道中保持相同的阶段:

int inputEvents=10000;

CountDownLatch completed = new CountDownLatch(1);
Flowable<List<String>> flowable = Flowable.<String>create(e -> {

    System.out.println(Thread.currentThread().getName() + ": Will send");
    for (int counter = 0; counter < inputEvents; counter++) {
        e.onNext("" + counter);
        Thread.sleep(5);
    }
    System.out.println(Thread.currentThread().getName() + ": Completed sending");
    e.onComplete();
}, BackpressureStrategy.DROP)
    .onBackpressureDrop(s -> System.out.println("Backpressure, dropping " + Arrays.asList(s)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnNext(strings -> System.out.println("\t" + Thread.currentThread().getName() + ": Buffered: " + strings.size() + " items"))
    .observeOn(Schedulers.io(), false)
    .doOnNext(strings -> {
        System.out.println("\t" + "\t" + Thread.currentThread().getName() + ": Waiting: " + strings.size());
        Thread.sleep(5000);
    });

flowable
    .subscribe(s -> System.out.println("\t" + "\t" + "onNext: " + s.size()),
            error -> {
                throw new RuntimeException(error);
            },
            () -> {
                System.out.println("\t" + "\t" + "Complete");
                completed.countDown();
            });

completed.await();

在生产中,我们得到 MissingBackpressureException: Could not emit buffer due to lack of requests 具有以下堆栈跟踪:

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
        at io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber.fastPathEmitMax(QueueDrainSubscriber.java:87)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableBufferTimed$BufferExactUnboundedSubscriber.run(FlowableBufferTimed.java:207)
        at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

所以我认为这与缓冲区的下游工作有关。

但是,无论我在 doOnNext 中阻塞多长时间,我都无法重现问题。示例输出:

main: Will send
    RxComputationThreadPool-1: Buffered: 197 items
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 197
        RxCachedThreadScheduler-1: Waiting: 196
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 196
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
...

我原以为 Thread.sleep(5000) 需要这么长时间,我们会收到回压。

有没有办法模拟这个,最好是在使用 TestScheduler/TestSubscriber 的测试中(避免 Thread.sleep()s)?

我能够通过增加事件发出的速率、增加事件的最大数量并降低消费者处理它们的速率来重现 MissingBackpressureException。

溢出的缓冲区是大小为 128 的默认 observeOn(...) 运算符缓冲区。由于它每秒接收一次新列表,因此至少需要几分钟的反压才能溢出。

请注意,您可以通过将其作为参数传递给 observeOn(...) 来覆盖此默认缓冲区大小。

回到背压处理,我认为您的管道的主要问题是 buffer(1, TimeUnit.SECONDS) 运算符。如果您查看 javadoc:

Backpressure:This operator does not support backpressure as it uses time. It requests Long.MAX_VALUE upstream and does not obey downstream requests.

由于上述原因,您的 onBackPressureDrop(...) 永远不会被调用。我想你可以通过在 buffer(...) 之后放置 onBackPressureDrop(...) 来解决这个问题。这样做会产生 Backpressure, dropping... 消息。

您应该能够使用以下方法对此进行单元测试: TestScheduler.advanceTimeBy(long, TimeUnit)。虽然我不得不承认,我还没有尝试过。