rxJava。帮助理解发布和取消订阅的工作原理

rxJava. Help to understand how publish and unsubscribe work

我在将 publish()observeOnsubscribeOn 结合使用时观察到奇怪的行为。请看下面的例子。

代码:

ConnectableObservable<String> observable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            int i=0;
            Log.d("testTag:", "start call");
            while (!subscriber.isUnsubscribed()) {
                subscriber.onNext("item "+i);
                i++;
            }
            Log.d("testTag:", "completed");
            subscriber.onCompleted();
        }
    }
).publish();

observable
    .take(10)
    .subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("testTag:", "item received 1 : " + String.valueOf(s));
            }
        });

observable.connect();

输出:

start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9

条件!subscriber.isUnsubscribed()从未发生。这完全没问题。当我添加 observeOn(Schedulers.newThread())subscribeOn(Schedulers.newThread()) 时会发生奇怪的事情。看看:

ConnectableObservable<String> observable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            int i=0;
            Log.d("testTag:", "start call");
            while (!subscriber.isUnsubscribed()) {
                subscriber.onNext("item "+i);
                i++;
            }
            Log.d("testTag:", "completed");
            subscriber.onCompleted();
        }
    }
)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.publish();

observable
    .take(10)
    .subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("testTag:", "item received 1 : " + String.valueOf(s));
            }
        });

observable.connect();

输出:

start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9
completed

请帮我理解为什么条件 !subscriber.isUnsubscribed() 变成 true.

PS。我知道如果我想检查 !subscriber.isUnsubscribed() 我应该使用 .publish().refCount() 而不是 connect()。我的目标是了解当前行为。

publish() 通过消耗源流直到完成(这里从未发生过)对 0 个订阅者作出反应。因此,一旦您使用唯一的 subscribe() 获取了 10,源将永远保留 运行 或直到您在 connect() 调用返回的 Subscription 上调用 unsubscribe

在第二种情况下,你得到 !subscriber.isUnsubscribed() true 因为你溢出了 observeOn 或发布的内部队列并且整个链因 MissingBackpressureException 而死,这触发了到达你的取消订阅最终可观察到。

你需要的是 share() 而不是 publish() 这样如果所有订阅者都离开你的源就会被终止,但请注意因为你的 Observable 不考虑背压,你仍然容易 MissingBackpressureException.

一般来说,我建议先查看标准运算符和 Observable 工厂而不是滚动你的 Observable,或者查看 AbstractOnSubscribe 及其 javadoc 中的示例以了解如何实现背压感知的自定义 Observable。