RxJava - 如何设置观察者阻塞
RxJava - How to set Observer to block
我希望我的 Observable
阻塞直到操作完成,然后继续下一个方法调用等。看看这段代码:
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
Observable observer1 = Observable.just(1, 2, 3)
.observeOn(AndroidSchedulers.mainThread());
Observable observer2 = observer1.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer myint) {
//multiples each int by 2
return myint * 2;
}
});
observer2.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread());
observer2.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
System.out.println("I want this statement to come after multiplication completes");
我知道我可以使用 onComplete
回调,但这不是我的意思。我想弄清楚如何阻止观察者直到它完成,然后继续我的其余代码。此时日志看起来像这样:
I/System.out﹕ I want this statement to come after multiplication completes
I/System.out﹕ this is the Integer multiplied by two:2
I/System.out﹕ this is the Integer multiplied by two:4
I/System.out﹕ this is the Integer multiplied by two:6
还要注意我是如何观察和订阅 MainThread 上的所有内容的,如果我不指定,这是默认完成的吗?
如果您想阻塞直到 Observable 完成,请使用 observable.toBlocking().forEach()
而不是 subscribe()
。
observer2
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
除了 forEach()
之外,还有许多 Blocking Observable Operators 可以用来获得所需的效果。例如,如果您只需要发出的第一项,则使用 observable.toBlocking().first()
此外,请注意 RxJava API returns 为您进行的每个调用提供一个新的 Observable。因此,以下行对 observable2 使用的调度程序没有影响。
observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());
它确实使用指定的调度程序创建了一个新的 Observable,但由于返回的 Observable 未分配给任何变量,因此将其丢弃。您可以改为执行以下操作。
observer2
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
我希望我的 Observable
阻塞直到操作完成,然后继续下一个方法调用等。看看这段代码:
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
Observable observer1 = Observable.just(1, 2, 3)
.observeOn(AndroidSchedulers.mainThread());
Observable observer2 = observer1.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer myint) {
//multiples each int by 2
return myint * 2;
}
});
observer2.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread());
observer2.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
System.out.println("I want this statement to come after multiplication completes");
我知道我可以使用 onComplete
回调,但这不是我的意思。我想弄清楚如何阻止观察者直到它完成,然后继续我的其余代码。此时日志看起来像这样:
I/System.out﹕ I want this statement to come after multiplication completes
I/System.out﹕ this is the Integer multiplied by two:2
I/System.out﹕ this is the Integer multiplied by two:4
I/System.out﹕ this is the Integer multiplied by two:6
还要注意我是如何观察和订阅 MainThread 上的所有内容的,如果我不指定,这是默认完成的吗?
如果您想阻塞直到 Observable 完成,请使用 observable.toBlocking().forEach()
而不是 subscribe()
。
observer2
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
除了 forEach()
之外,还有许多 Blocking Observable Operators 可以用来获得所需的效果。例如,如果您只需要发出的第一项,则使用 observable.toBlocking().first()
此外,请注意 RxJava API returns 为您进行的每个调用提供一个新的 Observable。因此,以下行对 observable2 使用的调度程序没有影响。
observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());
它确实使用指定的调度程序创建了一个新的 Observable,但由于返回的 Observable 未分配给任何变量,因此将其丢弃。您可以改为执行以下操作。
observer2
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});