Rxjava2 和 Retrofit2 Multiple/Parallel/Simultaneous api(post) 在不同线程上调用

Rxjava2 and Retrofit2 Multiple/Parallel/Simultaneous api(post) calls on different threads

我在下面有这段代码,我试图使用 Retrofit2 + RxJava2

实现多个连续的 api 调用
@Override
        public void onClick(View v) {

            count++;
            request.setName("rober");
            request.setVarryingValue(count);

            mApiService.apiService()
                    .getAccessToken(<params>)
                    .subscribeOn(Schedulers.newThread())
                    .flatMap(new Function<Auth, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Auth authentication) throws Exception {

                            Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
                                    .subscribeOn(Schedulers.io());
                            postObservable.subscribe(new Observer<Void>() {

                                @Override
                                public void onSubscribe(Disposable d) {}

                                @Override
                                public void onNext(Void value) {
                                    Log.e("Thread", " Thread : " + Thread.currentThread());
                                }

                                @Override
                                public void onError(Throwable e) {
                                    e.printStackTrace();
                                }

                                @Override
                                public void onComplete() {}
                            });

                            return postObservable;
                        }
                    }).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(Object value) {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onComplete() {}
            });
        }
    });

我的期望是对于每次点击,将调用分派到 new/different 线程以执行特定的 api POST 调用,但我有这些 post 值从内部 api 调用

{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

它似乎只执行最后一个调用,我了解到 Rx 中没有完全 "Parallel" 调用,因为如果它这样做,它将违反所有反应原则,但有很多解决方法对于他们所说的,现在我正在尝试使用我 posted 的代码实现 "Parallel" 调用,但运气不好 :(,我在这里需要一些帮助,

如有任何帮助,我们将不胜感激。谢谢!

编辑:过程是这样的 1.先获取auth token 2. 成功验证令牌后,继续 api POST 调用

数字 2 依赖于数字 1

1 和 2 将始终在点击事件时执行。

编辑: 我会post一张图片来说明清楚

我不太清楚为什么所有 post 请求都是同时完成的,完整的日志 onClick() 时间和 getAccessToken() 请求,并且 onNext()排放可能有帮助。

您的代码似乎做对了,您将为每个请求打开一个新线程。
但是,无论如何,由于您的日志上有 4 个打印件,但都具有相同的计数,问题可能是因为 request 参数是一个字段,因此在 after[=31] 之后共享和访问=] getAccessToken() 发出一个项目,因此你基本上做了 4 个请求但使用相同的数据。
您应该为每个新创建的 flatMap() 分配一个专用计数变量。

除此之外,在 flatMap() 运算符中订阅 postObservable 是错误的,您应该 return 它并且流将订阅它并将其合并回 onNext()排放量。
使用您的代码,您正在执行每个 post 请求 两次 ,一个由您明确执行,另一个由 flatMap() 操作员执行(这也可以解释多个日志相同的请求,但如果不查看日志中的所有数据就很难判断)。

在您发送第一个 POST 请求时,request 的变化值已经是 4。那是因为在您点击 4 次之后,第一个 POST request 将在当时(4)采用 request,而不是在设置其变化值时采用 request

解决方案是使 request 对象成为 onClick() 方法中的局部对象

public void onClick(View v) {
    RequestClass request = new RequestClass();
    request.setName("rober");
    request.setVarryingValue(count);

    //Your code
}

或者将 count 值分配给临时变量,并在发送 POST 请求之前将其设置为您的 request,但要注意线程安全

public void onClick(View v) {
    //Saving the value
    int temp = count++;
    request.setName("rober");

    mApiService.apiService()
            .getAccessToken(<params>)
            .subscribeOn(Schedulers.newThread())
            .flatMap(new Function<Auth, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Auth authentication) throws Exception {
                    //Set it to the request
                    //BE CAREFUL  because the `request` object is now being accessed from multiple threads
                    request.setVarryingValue(temp);

                    Observable<Void> postObservable = 
                        mApiService
                            .apiService()
                            .postCall(request, authentication.getAuth())
                            .subscribeOn(Schedulers.io())
                            .subscribe();

                    return postObservable;
                }
            })
            .subscribe();
}

根据您建议的两个答案,我想出了以下代码,

  • 请求变成局部变量而不是字段
  • 取消平面地图内的订阅
  • 将其链接到外部可观察对象的 onNext

    @Override
    public void onClick(View v) {
    
    count++;
    Request request = new Request();
    request.setName("rober");
    request.setVarryingValue(count);
    
    mApiService.apiService()
            .getAccessToken(<params>)             
            .subscribeOn(Schedulers.io())
            .flatMap(new Function<Auth, ObservableSource<Void>>() {
                @Override
                public ObservableSource<Void> apply(Auth authentication) throws Exception {
                    return mApiService.apiService().postCall(request, authentication.getAuth());
                }
            })  
            .subscribe(new Observer<ObservableSource<Void>>() {
    
                @Override
                public void onSubscribe(Disposable d) {}
    
                @Override
                public void onNext(ObservableSource<Void> sourceFromFlatMap) {
    
                    sourceFromFlatMap.subscribe(new Observer<Void>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {}
    
                        @Override
                        public void onNext(Void value) {
                        }
    
                        @Override
                        public void onError(Throwable e) {}
    
                        @Override
                        public void onComplete() {}
                    });
                }
    
                @Override
                public void onError(Throwable e) {}
    
                @Override
                public void onComplete() {}
            });
     });
    

虽然我仍然不确定我是否以正确的方式使用 Rx