RxJava3。为什么 FlowableSubscriber onNext 没有被调用?

RxJava3. Why FlowableSubscriber onNext has not been called?

我需要获取订阅对象才能有机会取消订阅听众。为此,我想提供一个 FlowableSubscriber 来运行。

代码:

FlowableSubscriber fs = new FlowableSubscriber() {
        @Override
        public void onSubscribe(@NonNull Subscription s) {
            System.out.println("Flowable onSubs");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("Flowable onNext");
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
    };

日志是:

Running...
Flowable onSubs

如果我使用 lambdas 它可以工作,但没有 onSubscribe 回调。

如何获得订阅以及为什么这些方法没有被调用?

由于 Flowable 支持背压,您必须通过在 Subscription 上调用 request 方法来实际控制可以消耗的项目数量,以便 [=11= 可以发出它们] :

FlowableSubscriber fs = new FlowableSubscriber() {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("Flowable onSubs");
            subscription=s;
            subscription.request(Integer.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
           System.out.println("Flowable onNext");
           //subscription.request(1); you can also request more items from onNext method - it is up to you
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
};

在示例中,Integer.MAX_VALUE 请求订阅,但这可能不是最好的主意。问题是你应该从 onSubscribe 调用 Subscription::request 来请求初始项目,然后从 onNext 调用它并决定你可以实际处理多少项目。