Android RxJava 在单独的线程中处理 http 响应行

Android RxJava process http response rows in separate threads

我正在研究 RxJava,看看是否可以用它来替换几年前创建的应用程序中已弃用的 AsynTasks。

我的用例如下:

  1. 在 Schedulers.io 上发出一个 http 请求 returns 一些行
  2. 在并行线程中分别处理行
  3. 只有在处理完所有行后才在主线程上更新UI

有没有办法在 rx 中轻松完成第 2 步 java?

下面是一个代码示例。

谢谢

Observable.fromCallable(()-> {

    // 1- get rows form server
    ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();

    // 2- process rows 
    for (HashMap row : rows) {
        //manipulate row
        row.put("test", "test");  <-- code that I want to parallelize
    }

    return rows;
})
        .subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
        .observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
        .subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {

            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
                //3- update UI....
            }
        }); 

经过几次尝试,我找到了这个解决方案。

通过将日志添加到 processRow 函数,我看到它是针对多行并行调用的,一如既往,最后调用了 onComplete。

Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
                .subscribeOn(Schedulers.io())
                .flatMapIterable(rowItem -> rowItem) 
                .flatMap(val -> Observable.just(val) //paralelize
                        .subscribeOn(Schedulers.computation())
                        .map(i -> processRow(i) )) // 2- process rows in parallel threads
                .observeOn(AndroidSchedulers.mainThread()) 
                .subscribe(new Observer<Object>() {

                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        
                    }

                    @Override
                    public void onNext(@NonNull Object listResponse) { }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        
                        e.printStackTrace();
                        
                    }

                    @Override
                    public void onComplete() {
                       //3- update UI....
                    }
                });