传递覆盖 onSubscribe 的订阅者时调用 dispose()
Calling dispose() when passing a subscriber that overrides onSubscribe
我是 RxJava 的新手,如果我理解正确的话,Observer
在 onSubscribe
上传递给了 Disposable
,所以如果 dispose()
它可以手动停止处理已经调用了。
我创建了以下代码:
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
private Disposable d;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.d = d;
}
@Override
public void onNext(@NonNull Long aLong) {
if(!d.isDisposed()) {
System.out.println("Number onNext = " + aLong);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("completed");
}
});
但我不知道如何为该订阅调用 dispose()
。 subscribe
将 Observer
作为参数传递 returns void
和 subscribeWith
不接受我的 Observer
没有编译错误。
这应该如何运作?我在这里误解了什么?
您可以使用 DisposableObserver,它可以在您完成观察后轻松处理。
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new DisposableObserver<Long>() {
@Override
public void onNext(@NotNull Long aLong) {
//Do anything you want to do..
dispose();
}
@Override
public void onError(@NotNull Throwable e) {
//Handle the errors here..
dispose();
}
@Override
public void onComplete() {
dispose();
}
});
您还可以使用 CompositeDisposable 一次处置多个观察者,有关详细信息,请查看此内容。
https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm
Observable
的JavaDocs有一个简单的例子:
Disposable d = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(String t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence can now be disposed via dispose()
d.dispose();
编辑
以下示例是从 onSubscribe
方法中获取 Disposable
的方法,但通常不推荐使用:
// field in the owner class
Disposable disposable;
public void doReactive() {
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
// ...
});
}
public void cleanup() {
if (disposable != null) {
disposable.dispose();
disposable = null;
}
}
或
SerialDisposable sd = new SerialDisposable();
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
sd.set(d);
}
// ...
});
// ...
sd.dispose();
我是 RxJava 的新手,如果我理解正确的话,Observer
在 onSubscribe
上传递给了 Disposable
,所以如果 dispose()
它可以手动停止处理已经调用了。
我创建了以下代码:
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
private Disposable d;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.d = d;
}
@Override
public void onNext(@NonNull Long aLong) {
if(!d.isDisposed()) {
System.out.println("Number onNext = " + aLong);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("completed");
}
});
但我不知道如何为该订阅调用 dispose()
。 subscribe
将 Observer
作为参数传递 returns void
和 subscribeWith
不接受我的 Observer
没有编译错误。
这应该如何运作?我在这里误解了什么?
您可以使用 DisposableObserver,它可以在您完成观察后轻松处理。
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new DisposableObserver<Long>() {
@Override
public void onNext(@NotNull Long aLong) {
//Do anything you want to do..
dispose();
}
@Override
public void onError(@NotNull Throwable e) {
//Handle the errors here..
dispose();
}
@Override
public void onComplete() {
dispose();
}
});
您还可以使用 CompositeDisposable 一次处置多个观察者,有关详细信息,请查看此内容。
https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm
Observable
的JavaDocs有一个简单的例子:
Disposable d = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(String t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence can now be disposed via dispose()
d.dispose();
编辑
以下示例是从 onSubscribe
方法中获取 Disposable
的方法,但通常不推荐使用:
// field in the owner class
Disposable disposable;
public void doReactive() {
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
// ...
});
}
public void cleanup() {
if (disposable != null) {
disposable.dispose();
disposable = null;
}
}
或
SerialDisposable sd = new SerialDisposable();
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
sd.set(d);
}
// ...
});
// ...
sd.dispose();