RxJava:OnNext 取消订阅不起作用
RxJava: OnNext Unsubscribe is not working
我在收到来自 observable 的第一项后尝试取消订阅。它似乎不起作用。我做错了什么?
public class ObservableAndSubscriber {
public static void main(final String[] args) {
final Observable<String> strObservable = Observable.create(s -> {
while (true) {
s.onNext("Hello World!!");
}
});
final Subscriber<String> strSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(final String t) {
System.out.println(t);
this.unsubscribe();
}
};
strObservable.subscribe(strSubscriber);
}
}
结果似乎在无限循环中打印 "Hello World"。
我在阅读 Tomasz Nurkiewicz 的书时做了这个例子。他后来有一个类似的例子,并解释了我的代码有什么问题。
无限循环是邪恶的!因为,我在可观察的 create lambda 表达式中有一个,它在我的主线程和无限期订阅块的上下文中执行。为了让 observable create lambda 在不同的线程中执行,这意味着主线程永远不会被阻塞并且 subscribe() 完成。我要求订阅发生在 IO 线程上,这成功了,这意味着无限循环发生在 IO 线程上并且不会阻塞主线程。
strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber);
要取消订阅才能正常工作,您需要在循环中检查订阅状态。否则它会 运行 无限耗尽你的 CPU.
处理无限序列的最简单方法是利用库提供的方法 Observable.generate()
或 Observable.fromIterable()
。他们会为您进行适当的检查。
还要确保您在不同的线程上订阅和观察。否则,您将只能为单个订阅者提供服务。
你的例子:
strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);
我在收到来自 observable 的第一项后尝试取消订阅。它似乎不起作用。我做错了什么?
public class ObservableAndSubscriber {
public static void main(final String[] args) {
final Observable<String> strObservable = Observable.create(s -> {
while (true) {
s.onNext("Hello World!!");
}
});
final Subscriber<String> strSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(final String t) {
System.out.println(t);
this.unsubscribe();
}
};
strObservable.subscribe(strSubscriber);
}
}
结果似乎在无限循环中打印 "Hello World"。
我在阅读 Tomasz Nurkiewicz 的书时做了这个例子。他后来有一个类似的例子,并解释了我的代码有什么问题。
无限循环是邪恶的!因为,我在可观察的 create lambda 表达式中有一个,它在我的主线程和无限期订阅块的上下文中执行。为了让 observable create lambda 在不同的线程中执行,这意味着主线程永远不会被阻塞并且 subscribe() 完成。我要求订阅发生在 IO 线程上,这成功了,这意味着无限循环发生在 IO 线程上并且不会阻塞主线程。
strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber);
要取消订阅才能正常工作,您需要在循环中检查订阅状态。否则它会 运行 无限耗尽你的 CPU.
处理无限序列的最简单方法是利用库提供的方法 Observable.generate()
或 Observable.fromIterable()
。他们会为您进行适当的检查。
还要确保您在不同的线程上订阅和观察。否则,您将只能为单个订阅者提供服务。
你的例子:
strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);