Android RxJava 在单独的线程中处理 http 响应行
Android RxJava process http response rows in separate threads
我正在研究 RxJava,看看是否可以用它来替换几年前创建的应用程序中已弃用的 AsynTasks。
我的用例如下:
- 在 Schedulers.io 上发出一个 http 请求 returns 一些行
- 在并行线程中分别处理行
- 只有在处理完所有行后才在主线程上更新UI
有没有办法在 rx 中轻松完成第 2 步 java?
下面是一个代码示例。
谢谢
Observable.fromCallable(()-> {
// 1- get rows form server
ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();
// 2- process rows
for (HashMap row : rows) {
//manipulate row
row.put("test", "test"); <-- code that I want to parallelize
}
return rows;
})
.subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
.observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
.subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
//3- update UI....
}
});
经过几次尝试,我找到了这个解决方案。
通过将日志添加到 processRow 函数,我看到它是针对多行并行调用的,一如既往,最后调用了 onComplete。
Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
.subscribeOn(Schedulers.io())
.flatMapIterable(rowItem -> rowItem)
.flatMap(val -> Observable.just(val) //paralelize
.subscribeOn(Schedulers.computation())
.map(i -> processRow(i) )) // 2- process rows in parallel threads
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object listResponse) { }
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
//3- update UI....
}
});
我正在研究 RxJava,看看是否可以用它来替换几年前创建的应用程序中已弃用的 AsynTasks。
我的用例如下:
- 在 Schedulers.io 上发出一个 http 请求 returns 一些行
- 在并行线程中分别处理行
- 只有在处理完所有行后才在主线程上更新UI
有没有办法在 rx 中轻松完成第 2 步 java?
下面是一个代码示例。
谢谢
Observable.fromCallable(()-> {
// 1- get rows form server
ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();
// 2- process rows
for (HashMap row : rows) {
//manipulate row
row.put("test", "test"); <-- code that I want to parallelize
}
return rows;
})
.subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
.observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
.subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
//3- update UI....
}
});
经过几次尝试,我找到了这个解决方案。
通过将日志添加到 processRow 函数,我看到它是针对多行并行调用的,一如既往,最后调用了 onComplete。
Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
.subscribeOn(Schedulers.io())
.flatMapIterable(rowItem -> rowItem)
.flatMap(val -> Observable.just(val) //paralelize
.subscribeOn(Schedulers.computation())
.map(i -> processRow(i) )) // 2- process rows in parallel threads
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object listResponse) { }
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
//3- update UI....
}
});