RXJava 的问题

Problems with RXJava

我正在改编 what3words 的一些示例代码,以通过他们的 Java SDK 访问他们的 API。它使用 RXJava.

示例代码为:

Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });

首先。这会在构建时给出弃用警告和 IDE 警告 (Result of 'Observable.subscribe()' is ignored).

为了解决第一个问题,我在 Observable 前面添加了 Disposable myDisposable = 。这个对吗? (添加位置见下文)

接下来我需要添加超时,以便在请求超时时显示警告等。为此,我已将 .timeout(5000, TimeUnit.MILLISECONDS) 添加到构建器中。

这行得通,但是 timeouts 似乎在 Observables 上工作的方式是它们抛出异常,我不知道如何捕获和处理该异常。

我现在拥有的是:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });

这构建并运行良好,并且未显示 API/deprecation 警告,但是当没有网络可用时,这会正确超时并抛出未处理的异常。

所以,代码似乎是正确的,但是究竟如何添加异常处理来捕获抛出的超时TimeoutException

我尝试了很多事情,包括:在整个 Observable 周围添加一个 try-catch 子句 - 这警告说 TimeoutException 不会被 `try; 中的代码抛出;并添加错误处理程序。

添加错误处理程序让我最接近,所以下面的代码是我所知道的:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
         }, error -> {
             runOnUiThread(new Runnable() {
                 @Override
                 public void run() {
                     myTextView.setText(R.string.network_not_available);
                 }
             });
         });

这正确地捕获了超时并无误地更新了我的 UI,但是当网络恢复时,Observable 似乎正在尝试 return 并且抛出了一个空指针异常。

(更新,这个NPE实际上有时会在短时间后抛出,无论网络是否恢复......但它总是在网络恢复时抛出。)

我得到 FATAL EXCEPTION: RxCachedThreadScheduler-1java.lang.NullPointerException: Callable returned a null value. Null values are generally not allowed in 3.x operators and sources.

我是否需要销毁 Observable 或其他东西来防止 NPE?

您需要向 subscribe 调用添加 onError 处理程序:

    .subscribe(result -> {
        if (result.isSuccessful()) {
            Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
        } else {
            Log.e("MainActivity", result.getError().getMessage());
        }
     },
     error -> {
         // handle error here
     });

当异常进入没有 onError 处理程序的订阅调用时,它将抛出 OnErrorNotImplementedException,如下所示:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

添加 onError 处理程序将防止这种情况发生,并且将改为调用 onError 处理程序。

这里发生了一些事情:

First of all. this gives a deprecation warning when building and a IDE warning (Result of 'Observable.subscribe()' is ignored).

subscribe()returns一个Disposable。这个想法是,当您不再对接收 observable 的输出感兴趣时,您可以调用 dispose() on the disposable 并且工作终止。这也可以防止内存泄漏。

举个例子,假设你有一个 Activity,然后你启动一个 Observable 来 运行 一个长网络查询,最终 post 对 Activity UI。如果用户在此任务完成之前离开,或者 Activity 以其他方式被销毁,那么您将不再对其输出感兴趣,因为不再有 UI 到 post 到。所以你可以在 onStop().

中调用 dispose()

So, the code seems to be correct, but how on earth do add the exception handling to catch the timeout TimeoutException that is thrown?

使用 subscribe 中的 error 块是一种选择,但还有其他选择。例如,如果您想继续使用 Result class,您可以使用类似 onErrorReturn(throwable -> Result.error(throwable)) 的内容。显然我在猜测 class 长什么样:

.timeout(5000, TimeUnit.MILLISECONDS)
.onErrorReturn(throwable -> Result.errorWithMessage(R.string.network_not_available))
.subscribe(result -> {
  if (result.isSuccessful()) {
    Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
  } else {
    myTextView.setText(result.getErrorMessage());
  }
});

java.lang.NullPointerException: Callable returned a null value. Null values are generally not allowed in 3.x operators and sources.

这个:

wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute()

正在返回 null。你可以这样做:

Observable.fromCallable(() -> {
  Result<?> out = wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute();
  if(out == null)
    out = Result.error(/*Returned null*/);
  }
  return out;
}