Observable/Subscriber 在异步任务中
Observable/Subscriber in AsyncTask
我正在尝试在 AsyncTask
的 onPostExecute()
上使用 RxJava
实现 Observable/Subscriber,但我不知道如何建立连接。
我在 onPostExecute
方法中创建了 Observable
。我想要 MyFragment
订阅这个。我该如何设置?
public class LoadAndStoreDataTask extends AsyncTask<String, Integer, String> {
...
@Override
protected void onPostExecute(String result) {
// create the observable
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result);
subscriber.onCompleted();
}
}
);
myObservable.subscribe(mySubscriber);
}
}
public class MyFragment extends Fragment {
...
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
}
...
}
实际上,RxJava 应该取代 AsycTask。事实上,我可以自信地说 AsyncTask 是 RxJava 的一个子集。
在 RxJava 中,Subscriber 类似于 AsyncTask.progressUpdate
或 onPostExecute
,Observable 类似于 doInBackground
中的进程。数据从 Observable 发送到 Subscriber 并且此流中的任何更改都是通过映射方法完成的。您现在可能不需要映射,所以我会像这样重新配置我的 RxJava:
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try{
String res = ...//your original doInBackground
subscriber.onNext(res);
// onNext would be comparable to AsyncTask.onProgressUpdate
// and usually applies when backgorund process runs a loop
subscriber.onCompleted();
}catch (SomeException e){
// if the process throws an exception or produces a result
// you'd consider error then use onError
subscriber.onError(e);
}
}
}
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // If subscriber runs on UI thread
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String response) {
// result from Observable.onNext. The methods below correspond
// to their Observable counterparts.
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
AndroidSchedulers
在 RxAndroid 中可用。要使用它,请将此行添加到 build.gradle :
compile 'io.reactivex:rxandroid:0.24.0'
我正在尝试在 AsyncTask
的 onPostExecute()
上使用 RxJava
实现 Observable/Subscriber,但我不知道如何建立连接。
我在 onPostExecute
方法中创建了 Observable
。我想要 MyFragment
订阅这个。我该如何设置?
public class LoadAndStoreDataTask extends AsyncTask<String, Integer, String> {
...
@Override
protected void onPostExecute(String result) {
// create the observable
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result);
subscriber.onCompleted();
}
}
);
myObservable.subscribe(mySubscriber);
}
}
public class MyFragment extends Fragment {
...
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
}
...
}
实际上,RxJava 应该取代 AsycTask。事实上,我可以自信地说 AsyncTask 是 RxJava 的一个子集。
在 RxJava 中,Subscriber 类似于 AsyncTask.progressUpdate
或 onPostExecute
,Observable 类似于 doInBackground
中的进程。数据从 Observable 发送到 Subscriber 并且此流中的任何更改都是通过映射方法完成的。您现在可能不需要映射,所以我会像这样重新配置我的 RxJava:
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try{
String res = ...//your original doInBackground
subscriber.onNext(res);
// onNext would be comparable to AsyncTask.onProgressUpdate
// and usually applies when backgorund process runs a loop
subscriber.onCompleted();
}catch (SomeException e){
// if the process throws an exception or produces a result
// you'd consider error then use onError
subscriber.onError(e);
}
}
}
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // If subscriber runs on UI thread
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String response) {
// result from Observable.onNext. The methods below correspond
// to their Observable counterparts.
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
AndroidSchedulers
在 RxAndroid 中可用。要使用它,请将此行添加到 build.gradle :
compile 'io.reactivex:rxandroid:0.24.0'