Observable.take() 导致 NPE
Observable.take() causing NPE
我有一个像这样的简单程序:
public class MainApp {
public static void main(String[] args) {
getAcronyms()
.flatMap(Observable::fromIterable)
.flatMap(MainApp::getTitle)
.filter(Objects::nonNull)
.subscribe(System.out::println);
}
private static Observable<List<String>> getAcronyms(){
List<String> strings = new ArrayList<>();
strings.add("YOLO");
strings.add("LMAO");
strings.add("ROFL");
strings.add("AYY LMAO");
return new Observable<List<String>>() {
@Override
protected void subscribeActual(Observer<? super List<String>> observer) {
observer.onNext(strings);
observer.onComplete();
}
};
}
private static Observable<String> getTitle(String url) {
return new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext(url + " title!");
observer.onComplete();
}
};
}
}
这很好用,但是当我链接 take
:
getAcronyms()
.flatMap(Observable::fromIterable)
.flatMap(MainApp::getTitle)
.filter(Objects::nonNull)
.take(2)
.subscribe(System.out::println);
它打印了 2 个值,但给了我一个 NPE:
YOLO title!
LMAO title!
Exception in thread "main" java.lang.NullPointerException at
io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onComplete(ObservableTake.java:83)
at
io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onNext(ObservableTake.java:64)
at
io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.tryEmit(ObservableFlatMap.java:262)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onNext(ObservableFlatMap.java:559)
at MainApp.subscribeActual(MainApp.java:41) at
io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:436)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onSubscribe(ObservableFlatMap.java:546)
at
io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at
io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at MainApp.subscribeActual(MainApp.java:31) at
io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10842) at
io.reactivex.Observable.subscribe(Observable.java:10828) at
io.reactivex.Observable.subscribe(Observable.java:10731) at
MainApp.main(MainApp.java:18)
谁能帮我弄清楚为什么会这样以及我做错了什么?
此异常的发生是由于 take
,在发出定义数量的项目后,内部尝试处理未设置的 Disposable
对象。
因此,您必须在创建初始 Observable
时通过在 subscribeActual
方法实现中调用 observer.onSubscribe(disposable)
来提供它。但是不要重新发明轮子,而且通过调用其 public 构造函数创建 Observable
是为了自定义运算符。只需使用静态工厂方法。在你的情况下,最好的选择是 Observable.fromCallable
:
private static Observable<List<String>> getAcronyms(){
return Observable.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
List<String> strings = new ArrayList<>();
strings.add("YOLO");
strings.add("LMAO");
strings.add("ROFL");
strings.add("AYY LMAO");
return strings;
}
});
}
另请查看这篇文章:RxJava 2 Disposable - Under the hood
我有一个像这样的简单程序:
public class MainApp {
public static void main(String[] args) {
getAcronyms()
.flatMap(Observable::fromIterable)
.flatMap(MainApp::getTitle)
.filter(Objects::nonNull)
.subscribe(System.out::println);
}
private static Observable<List<String>> getAcronyms(){
List<String> strings = new ArrayList<>();
strings.add("YOLO");
strings.add("LMAO");
strings.add("ROFL");
strings.add("AYY LMAO");
return new Observable<List<String>>() {
@Override
protected void subscribeActual(Observer<? super List<String>> observer) {
observer.onNext(strings);
observer.onComplete();
}
};
}
private static Observable<String> getTitle(String url) {
return new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext(url + " title!");
observer.onComplete();
}
};
}
}
这很好用,但是当我链接 take
:
getAcronyms()
.flatMap(Observable::fromIterable)
.flatMap(MainApp::getTitle)
.filter(Objects::nonNull)
.take(2)
.subscribe(System.out::println);
它打印了 2 个值,但给了我一个 NPE:
YOLO title!
LMAO title!
Exception in thread "main" java.lang.NullPointerException at io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onComplete(ObservableTake.java:83) at io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onNext(ObservableTake.java:64) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.tryEmit(ObservableFlatMap.java:262) at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onNext(ObservableFlatMap.java:559) at MainApp.subscribeActual(MainApp.java:41) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:436) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323) at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onSubscribe(ObservableFlatMap.java:546) at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) at MainApp.subscribeActual(MainApp.java:31) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.Observable.subscribe(Observable.java:10828) at io.reactivex.Observable.subscribe(Observable.java:10731) at MainApp.main(MainApp.java:18)
谁能帮我弄清楚为什么会这样以及我做错了什么?
此异常的发生是由于 take
,在发出定义数量的项目后,内部尝试处理未设置的 Disposable
对象。
因此,您必须在创建初始 Observable
时通过在 subscribeActual
方法实现中调用 observer.onSubscribe(disposable)
来提供它。但是不要重新发明轮子,而且通过调用其 public 构造函数创建 Observable
是为了自定义运算符。只需使用静态工厂方法。在你的情况下,最好的选择是 Observable.fromCallable
:
private static Observable<List<String>> getAcronyms(){
return Observable.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
List<String> strings = new ArrayList<>();
strings.add("YOLO");
strings.add("LMAO");
strings.add("ROFL");
strings.add("AYY LMAO");
return strings;
}
});
}
另请查看这篇文章:RxJava 2 Disposable - Under the hood