Android 中的 RxJava 异步任务

RxJava async task in Android

我正在尝试在 Android 中使用 RxJava 实现一个异步任务。 我尝试了以下代码,但没有用。它在 UI 线程上执行。我正在使用以下版本的 RxAndroid 0.24.0.

try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

但是,以下内容对我来说是异步的。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

我正在尝试了解以下内容:

  1. 它们有什么区别?
  2. 创建异步任务时的最佳做法是什么?

使用 RxJava Async Utils 库中的 Async.start()。这将调用您在另一个线程上提供的函数。

示例:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

正如您所注意到的,检查异常必须包装到 RuntimeExceptions 中。

另见 https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

  1. 它们有什么区别?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

这等同于:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

如您所见,这首先进行了同步方法调用,然后将其传递给Observable.just成为一个 Observable。

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

但是,此方法将 运行 call 中的代码阻塞 订阅 。你已经告诉它你想订阅一个新线程(subscribeOn(Schedulers.newThread())),所以订阅发生在一个新线程上,并且订阅时获得 运行 的代码(call 块) 也在该线程上获得 运行。这类似于调用 Observable.defer.

  1. 创建异步任务时的最佳做法是什么?

好吧,这取决于您和您想要的行为。有时您希望异步代码立即开始 运行ning(在这种情况下,您可能希望为此目的使用其中一个运算符对其进行缓存)。为此,我肯定会考虑使用 Async Utils 库。

其他时候,您会希望它仅在订阅时 运行(这是此处示例中的行为)——例如,如果有副作用,或者您不关心什么时候它是 运行 并且只想使用内置函数从 UI 线程中获取一些东西。 Dan Lew mentions 在转换为 Rx 期间,Observable.defer 非常方便地获取旧代码并将其从 UI 线程中取出。