rxJava。帮助理解发布和取消订阅的工作原理
rxJava. Help to understand how publish and unsubscribe work
我在将 publish()
与 observeOn
和 subscribeOn
结合使用时观察到奇怪的行为。请看下面的例子。
代码:
ConnectableObservable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
int i=0;
Log.d("testTag:", "start call");
while (!subscriber.isUnsubscribed()) {
subscriber.onNext("item "+i);
i++;
}
Log.d("testTag:", "completed");
subscriber.onCompleted();
}
}
).publish();
observable
.take(10)
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
Log.d("testTag:", "item received 1 : " + String.valueOf(s));
}
});
observable.connect();
输出:
start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9
条件!subscriber.isUnsubscribed()
从未发生。这完全没问题。当我添加 observeOn(Schedulers.newThread())
和 subscribeOn(Schedulers.newThread())
时会发生奇怪的事情。看看:
ConnectableObservable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
int i=0;
Log.d("testTag:", "start call");
while (!subscriber.isUnsubscribed()) {
subscriber.onNext("item "+i);
i++;
}
Log.d("testTag:", "completed");
subscriber.onCompleted();
}
}
)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.publish();
observable
.take(10)
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
Log.d("testTag:", "item received 1 : " + String.valueOf(s));
}
});
observable.connect();
输出:
start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9
completed
请帮我理解为什么条件 !subscriber.isUnsubscribed()
变成 true
.
PS。我知道如果我想检查 !subscriber.isUnsubscribed()
我应该使用 .publish().refCount()
而不是 connect()
。我的目标是了解当前行为。
publish()
通过消耗源流直到完成(这里从未发生过)对 0 个订阅者作出反应。因此,一旦您使用唯一的 subscribe() 获取了 10,源将永远保留 运行 或直到您在 connect() 调用返回的 Subscription
上调用 unsubscribe
。
在第二种情况下,你得到 !subscriber.isUnsubscribed() true 因为你溢出了 observeOn 或发布的内部队列并且整个链因 MissingBackpressureException
而死,这触发了到达你的取消订阅最终可观察到。
你需要的是 share() 而不是 publish() 这样如果所有订阅者都离开你的源就会被终止,但请注意因为你的 Observable 不考虑背压,你仍然容易 MissingBackpressureException
.
一般来说,我建议先查看标准运算符和 Observable 工厂而不是滚动你的 Observable,或者查看 AbstractOnSubscribe
及其 javadoc 中的示例以了解如何实现背压感知的自定义 Observable。
我在将 publish()
与 observeOn
和 subscribeOn
结合使用时观察到奇怪的行为。请看下面的例子。
代码:
ConnectableObservable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
int i=0;
Log.d("testTag:", "start call");
while (!subscriber.isUnsubscribed()) {
subscriber.onNext("item "+i);
i++;
}
Log.d("testTag:", "completed");
subscriber.onCompleted();
}
}
).publish();
observable
.take(10)
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
Log.d("testTag:", "item received 1 : " + String.valueOf(s));
}
});
observable.connect();
输出:
start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9
条件!subscriber.isUnsubscribed()
从未发生。这完全没问题。当我添加 observeOn(Schedulers.newThread())
和 subscribeOn(Schedulers.newThread())
时会发生奇怪的事情。看看:
ConnectableObservable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
int i=0;
Log.d("testTag:", "start call");
while (!subscriber.isUnsubscribed()) {
subscriber.onNext("item "+i);
i++;
}
Log.d("testTag:", "completed");
subscriber.onCompleted();
}
}
)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.publish();
observable
.take(10)
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
Log.d("testTag:", "item received 1 : " + String.valueOf(s));
}
});
observable.connect();
输出:
start call
item received 1 : item 0
item received 1 : item 1
item received 1 : item 2
item received 1 : item 3
item received 1 : item 4
item received 1 : item 5
item received 1 : item 6
item received 1 : item 7
item received 1 : item 8
item received 1 : item 9
completed
请帮我理解为什么条件 !subscriber.isUnsubscribed()
变成 true
.
PS。我知道如果我想检查 !subscriber.isUnsubscribed()
我应该使用 .publish().refCount()
而不是 connect()
。我的目标是了解当前行为。
publish()
通过消耗源流直到完成(这里从未发生过)对 0 个订阅者作出反应。因此,一旦您使用唯一的 subscribe() 获取了 10,源将永远保留 运行 或直到您在 connect() 调用返回的 Subscription
上调用 unsubscribe
。
在第二种情况下,你得到 !subscriber.isUnsubscribed() true 因为你溢出了 observeOn 或发布的内部队列并且整个链因 MissingBackpressureException
而死,这触发了到达你的取消订阅最终可观察到。
你需要的是 share() 而不是 publish() 这样如果所有订阅者都离开你的源就会被终止,但请注意因为你的 Observable 不考虑背压,你仍然容易 MissingBackpressureException
.
一般来说,我建议先查看标准运算符和 Observable 工厂而不是滚动你的 Observable,或者查看 AbstractOnSubscribe
及其 javadoc 中的示例以了解如何实现背压感知的自定义 Observable。