ObservableZip .subscribe() crash: null pointer 异常,请问是什么原因导致的呢?

ObservableZip .subscribe() crash: null pointer exception, but what causes it?

我在 Google Play 控制台中看到了很多这样的崩溃,但我无法从堆栈跟踪中看到代码中究竟是什么导致了它。我怎样才能知道?

java.lang.NullPointerException: 
  at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe (ObservableZip.java:110)
  at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual (ObservableZip.java:72)
  at io.reactivex.Observable.subscribe (Observable.java:12284)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run (ObservableSubscribeOn.java:96)
  at io.reactivex.Scheduler$DisposeTask.run (Scheduler.java:578)
  at io.reactivex.internal.schedulers.ScheduledRunnable.run (ScheduledRunnable.java:66)
  at io.reactivex.internal.schedulers.ScheduledRunnable.call (ScheduledRunnable.java:57)
  at java.util.concurrent.FutureTask.run (FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:301)
  at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1167)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:641)
  at java.lang.Thread.run (Thread.java:919)

我检查了 RxJava2 ObservableZip.java:110,这是对 .subscribe() 的调用。在我的代码中,我的某些部分(主要是片段)有 Observable.zip(),它看起来像这样:

DisposableObserver<List<WalletBalance>> walletBalanceUpdateObserver = new DisposableObserver<List<WalletBalance>>() {
    @Override
    public void onNext(List<WalletBalance> results) {
        if(isAdded() && getActivity() != null) {
            // process results ...
        }
        this.dispose();
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() { }
};

Observable.zip(apiCalls, responses -> {
    List<WalletBalance> allWallets = new ArrayList<>();
    for (Object res : responses) {
        Response<WalletBalance> response = (Response<WalletBalance>) res;
        if (response != null && response.body() != null)
            allWallets.add(response.body());
    }

    return allWallets;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(walletBalanceUpdateObserver);

这里发生崩溃的原因是什么?

问题

Observable.zip(apiCalls, responses -> {apiCalls 的元素之一很可能是 null

重现

import io.reactivex.Observable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

class So65600653 {
  @Test
  void zipTest() {
    Observable<Integer> zip =
        Observable.zip(
            Arrays.asList(Observable.just(1), null, Observable.just(3)),
            objects -> {
              return 42;
            });

    Assertions.assertThatThrownBy(() -> {
        zip.test();
    }).isExactlyInstanceOf(NullPointerException.class);
  }
}

堆栈跟踪

java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)
    at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)

RxJava 源代码

        for (int i = 0; i < len; i++) {
            if (cancelled) {
                return;
            }
            sources[i].subscribe(s[i]); // Line 110
        }

在这种情况下,可以说 sources 为空,或者 sources[i]null 并且在 null 上调用 subscribe 将抛出 NPE。

但是我们可以排除 sourcesnull 因为,它在上面被检查并给了 ZipCoordinator(Observer<? super R> actual, ...ZipCoordinator 是从

调用的
    ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
    zc.subscribe(sources, bufferSize);

这确保 sources 不为空。

    ObservableSource<? extends T>[] sources = this.sources;
    int count = 0;
    if (sources == null) {
        sources = new Observable[8];
        for (ObservableSource<? extends T> p : sourcesIterable) {

解决方案

在传递给 Observable#zip 之前从 apiCalls 过滤掉所有 null 值。