Rxjava2 和 Retrofit2 Multiple/Parallel/Simultaneous api(post) 在不同线程上调用
Rxjava2 and Retrofit2 Multiple/Parallel/Simultaneous api(post) calls on different threads
我在下面有这段代码,我试图使用 Retrofit2 + RxJava2
实现多个连续的 api 调用
@Override
public void onClick(View v) {
count++;
request.setName("rober");
request.setVarryingValue(count);
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io());
postObservable.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Void value) {
Log.e("Thread", " Thread : " + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {}
});
return postObservable;
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Object value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
}
});
我的期望是对于每次点击,将调用分派到 new/different 线程以执行特定的 api POST 调用,但我有这些 post 值从内部 api 调用
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
它似乎只执行最后一个调用,我了解到 Rx 中没有完全 "Parallel" 调用,因为如果它这样做,它将违反所有反应原则,但有很多解决方法对于他们所说的,现在我正在尝试使用我 posted 的代码实现 "Parallel" 调用,但运气不好 :(,我在这里需要一些帮助,
如有任何帮助,我们将不胜感激。谢谢!
编辑:过程是这样的
1.先获取auth token
2. 成功验证令牌后,继续 api POST 调用
数字 2 依赖于数字 1
1 和 2 将始终在点击事件时执行。
编辑: 我会post一张图片来说明清楚
- 我有 2 个来源(可以是 3 个或更多)
- 这些来源来自点击事件
- 每个源会处理不同的网络调用
- 我正在尝试在不同的线程中执行它(使其异步)
每个将 post 服务器上的不同值
但它似乎只处理最后一个事件(点击)
- 使用上图,我预计 POST A 和 POST B 将发生在 parallel/differently
- 但它只执行POST B(实际上是2 POST B)
- 我期待它会以不同的方式执行 POST A & POST B
我不太清楚为什么所有 post 请求都是同时完成的,完整的日志 onClick()
时间和 getAccessToken()
请求,并且 onNext()
排放可能有帮助。
您的代码似乎做对了,您将为每个请求打开一个新线程。
但是,无论如何,由于您的日志上有 4 个打印件,但都具有相同的计数,问题可能是因为 request
参数是一个字段,因此在 after[=31] 之后共享和访问=] getAccessToken()
发出一个项目,因此你基本上做了 4 个请求但使用相同的数据。
您应该为每个新创建的 flatMap()
分配一个专用计数变量。
除此之外,在 flatMap()
运算符中订阅 postObservable
是错误的,您应该 return 它并且流将订阅它并将其合并回 onNext()
排放量。
使用您的代码,您正在执行每个 post 请求 两次 ,一个由您明确执行,另一个由 flatMap()
操作员执行(这也可以解释多个日志相同的请求,但如果不查看日志中的所有数据就很难判断)。
在您发送第一个 POST 请求时,request
的变化值已经是 4。那是因为在您点击 4 次之后,第一个 POST request 将在当时(4)采用 request
,而不是在设置其变化值时采用 request
。
解决方案是使 request
对象成为 onClick()
方法中的局部对象
public void onClick(View v) {
RequestClass request = new RequestClass();
request.setName("rober");
request.setVarryingValue(count);
//Your code
}
或者将 count
值分配给临时变量,并在发送 POST 请求之前将其设置为您的 request
,但要注意线程安全
public void onClick(View v) {
//Saving the value
int temp = count++;
request.setName("rober");
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
//Set it to the request
//BE CAREFUL because the `request` object is now being accessed from multiple threads
request.setVarryingValue(temp);
Observable<Void> postObservable =
mApiService
.apiService()
.postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io())
.subscribe();
return postObservable;
}
})
.subscribe();
}
根据您建议的两个答案,我想出了以下代码,
- 请求变成局部变量而不是字段
- 取消平面地图内的订阅
将其链接到外部可观察对象的 onNext
@Override
public void onClick(View v) {
count++;
Request request = new Request();
request.setName("rober");
request.setVarryingValue(count);
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.io())
.flatMap(new Function<Auth, ObservableSource<Void>>() {
@Override
public ObservableSource<Void> apply(Auth authentication) throws Exception {
return mApiService.apiService().postCall(request, authentication.getAuth());
}
})
.subscribe(new Observer<ObservableSource<Void>>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(ObservableSource<Void> sourceFromFlatMap) {
sourceFromFlatMap.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
});
虽然我仍然不确定我是否以正确的方式使用 Rx
我在下面有这段代码,我试图使用 Retrofit2 + RxJava2
实现多个连续的 api 调用@Override
public void onClick(View v) {
count++;
request.setName("rober");
request.setVarryingValue(count);
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io());
postObservable.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Void value) {
Log.e("Thread", " Thread : " + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {}
});
return postObservable;
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Object value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
}
});
我的期望是对于每次点击,将调用分派到 new/different 线程以执行特定的 api POST 调用,但我有这些 post 值从内部 api 调用
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
它似乎只执行最后一个调用,我了解到 Rx 中没有完全 "Parallel" 调用,因为如果它这样做,它将违反所有反应原则,但有很多解决方法对于他们所说的,现在我正在尝试使用我 posted 的代码实现 "Parallel" 调用,但运气不好 :(,我在这里需要一些帮助,
如有任何帮助,我们将不胜感激。谢谢!
编辑:过程是这样的 1.先获取auth token 2. 成功验证令牌后,继续 api POST 调用
数字 2 依赖于数字 1
1 和 2 将始终在点击事件时执行。
编辑: 我会post一张图片来说明清楚
- 我有 2 个来源(可以是 3 个或更多)
- 这些来源来自点击事件
- 每个源会处理不同的网络调用
- 我正在尝试在不同的线程中执行它(使其异步)
每个将 post 服务器上的不同值
但它似乎只处理最后一个事件(点击)
- 使用上图,我预计 POST A 和 POST B 将发生在 parallel/differently
- 但它只执行POST B(实际上是2 POST B)
- 我期待它会以不同的方式执行 POST A & POST B
我不太清楚为什么所有 post 请求都是同时完成的,完整的日志 onClick()
时间和 getAccessToken()
请求,并且 onNext()
排放可能有帮助。
您的代码似乎做对了,您将为每个请求打开一个新线程。
但是,无论如何,由于您的日志上有 4 个打印件,但都具有相同的计数,问题可能是因为 request
参数是一个字段,因此在 after[=31] 之后共享和访问=] getAccessToken()
发出一个项目,因此你基本上做了 4 个请求但使用相同的数据。
您应该为每个新创建的 flatMap()
分配一个专用计数变量。
除此之外,在 flatMap()
运算符中订阅 postObservable
是错误的,您应该 return 它并且流将订阅它并将其合并回 onNext()
排放量。
使用您的代码,您正在执行每个 post 请求 两次 ,一个由您明确执行,另一个由 flatMap()
操作员执行(这也可以解释多个日志相同的请求,但如果不查看日志中的所有数据就很难判断)。
在您发送第一个 POST 请求时,request
的变化值已经是 4。那是因为在您点击 4 次之后,第一个 POST request 将在当时(4)采用 request
,而不是在设置其变化值时采用 request
。
解决方案是使 request
对象成为 onClick()
方法中的局部对象
public void onClick(View v) {
RequestClass request = new RequestClass();
request.setName("rober");
request.setVarryingValue(count);
//Your code
}
或者将 count
值分配给临时变量,并在发送 POST 请求之前将其设置为您的 request
,但要注意线程安全
public void onClick(View v) {
//Saving the value
int temp = count++;
request.setName("rober");
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
//Set it to the request
//BE CAREFUL because the `request` object is now being accessed from multiple threads
request.setVarryingValue(temp);
Observable<Void> postObservable =
mApiService
.apiService()
.postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io())
.subscribe();
return postObservable;
}
})
.subscribe();
}
根据您建议的两个答案,我想出了以下代码,
- 请求变成局部变量而不是字段
- 取消平面地图内的订阅
将其链接到外部可观察对象的 onNext
@Override public void onClick(View v) { count++; Request request = new Request(); request.setName("rober"); request.setVarryingValue(count); mApiService.apiService() .getAccessToken(<params>) .subscribeOn(Schedulers.io()) .flatMap(new Function<Auth, ObservableSource<Void>>() { @Override public ObservableSource<Void> apply(Auth authentication) throws Exception { return mApiService.apiService().postCall(request, authentication.getAuth()); } }) .subscribe(new Observer<ObservableSource<Void>>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(ObservableSource<Void> sourceFromFlatMap) { sourceFromFlatMap.subscribe(new Observer<Void>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(Void value) { } @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); } @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); });
虽然我仍然不确定我是否以正确的方式使用 Rx