订阅者如何通过反应性拉背压控制发布者?

How can a subscriber control a Publisher with reactive pull backpressure?

我有一个发布者,其发布速度可能比订阅者处理数据的速度快。为了解决这个问题,我开始使用背压。因为我不想丢弃任何数据,所以我使用反应式拉背压。我将此理解为订阅者能够告诉发布者何时发布更多数据,如 this and the follwing paragraphs 中所述。

发布者是一个Flowable,它以异步方式并行工作,然后合并到一个顺序Flowable中。数据最多应缓冲 10 个元素,当此缓冲区已满时,Flowable 不应再发布任何数据并等待下一个请求。

订阅者是一个 DisposableSubscriber,它在开始时请求 10 个项目。每个消耗的项目都需要一些计算,然后将请求一个新项目。

我的 MWE 看起来像这样:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

我希望此代码执行以下操作:订户请求前 10 个项目。发布者发布前 10 个项目。然后订阅者在 onNext 中进行计算并请求更多项目,发布者将发布这些项目。

实际情况:起初,发布者似乎无限制地发布项目。在某些时候,例如在 14 个已发布的项目之后,订阅者处理它的第一个项目。发生这种情况时,发布者会继续发布项目。在大约 30 个已发布项目后,抛出 io.reactivex.exceptions.MissingBackpressureException: Buffer is full 并且流结束。

我的问题:我做错了什么? 我怎样才能让订阅者控制发布者是否以及何时发布数据?显然,我做错了一些可怕的事情。否则,期望不会与现实如此不同。

上述 MWE 的示例输出:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

不是 Rx 方面的专家,但让我试一试。observeOn(...) 有它自己的默认缓冲区大小 128。所以,从一开始它就会从上游请求比你的请求更多缓冲区可以容纳。

observeOn(...) 接受可选的缓冲区大小覆盖,但即使您提供它,ParallelFlowable 也会比您想要的更频繁地调用您的 flatMap(...) 方法。我不是 100% 确定为什么,也许它有自己的内部缓冲,它在将 rails 合并回顺序时执行。

我认为您可以通过使用 flatMap(...) 而不是 parralel(...) 来更接近您想要的行为,并提供 maxConcurrency 参数。

要记住的另一件事是你不想调用 subscribeOn(...) - 它意味着影响整个上游 Flowable。因此,如果您已经在调用 parallel(...).runOn(...),则它没有任何效果,或者效果会出乎意料。

有了以上内容,我认为这会让您更接近您要找的东西:

    List<Integer> src = new ArrayList<>();
    for (int i = 0; i < 200; i++) {
        src.add(i);
    }
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
    Flowable.fromIterable(src)
            .flatMap(
                    i -> Flowable.just( i )
                            .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                            .map( __ -> {
                                System.out.println("publisher: " + i);
                                Thread.sleep(200);
                                return i;
                            } ),
                    10) // max concurrency
            .observeOn(Schedulers.newThread(), false, 10) // override buffer size
            .doOnError(Throwable::printStackTrace)
            .subscribeWith(new DisposableSubscriber<Integer>() {
                @Override
                protected void onStart() {
                    request(10);
                }
                @Override
                public void onNext(Integer integer) {
                    System.out.println("subscriber: " + integer);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(1);
                }
                @Override
                public void onError(Throwable t) {
                }
                @Override
                public void onComplete() {
                }
            });
    try {
        Thread.sleep(1000000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }