Observable/Subscriber 在异步任务中

Observable/Subscriber in AsyncTask

我正在尝试在 AsyncTaskonPostExecute() 上使用 RxJava 实现 Observable/Subscriber,但我不知道如何建立连接。 我在 onPostExecute 方法中创建了 Observable。我想要 MyFragment 订阅这个。我该如何设置?

public class LoadAndStoreDataTask extends AsyncTask<String,  Integer,  String> {
    ...

    @Override
    protected void onPostExecute(String result) {
         // create the observable    
        Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext(result);
                        subscriber.onCompleted();
                    }
                }
        );

        myObservable.subscribe(mySubscriber);
    }
}

public class MyFragment extends Fragment {

    ...

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) { System.out.println(s); }

            @Override
            public void onCompleted() { }

            @Override
            public void onError(Throwable e) { }
        };
    }
    ...
}

实际上,RxJava 应该取代 AsycTask。事实上,我可以自信地说 AsyncTask 是 RxJava 的一个子集。

在 RxJava 中,Subscriber 类似于 AsyncTask.progressUpdateonPostExecute,Observable 类似于 doInBackground 中的进程。数据从 Observable 发送到 Subscriber 并且此流中的任何更改都是通过映射方法完成的。您现在可能不需要映射,所以我会像这样重新配置我的 RxJava:

 Observable<String> myObservable = Observable.create(
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {                      
               try{
                   String res = ...//your original doInBackground                       
                   subscriber.onNext(res); 
                   // onNext would be comparable to AsyncTask.onProgressUpdate 
                   // and usually applies when backgorund process runs a loop
                   subscriber.onCompleted();
               }catch (SomeException e){
                   // if the process throws an exception or produces a result 
                   // you'd consider error then use onError                         
                   subscriber.onError(e);
               }
            }
        }
 )
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread()) // If subscriber runs on UI thread
 .subscribe(new Subscriber<String>() {

        @Override
        public void onNext(String response) {
          // result from Observable.onNext. The methods below correspond
          // to their Observable counterparts. 
        }

        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}
 });

AndroidSchedulers 在 RxAndroid 中可用。要使用它,请将此行添加到 build.gradle :

compile 'io.reactivex:rxandroid:0.24.0'