如何从 Retrofit、RxJava 中的所有请求发出多个请求和响应,Android
How to make multiple request and responses from all the requests in Retrofit, RxJava, Android
我有一个场景,我想为多个设备调用相同的 API 并在完成所有请求后显示结果。
我正在使用改造 2。
我对 RxJava 知之甚少。我认为 zip 运算符适合这个。所以实现如下。
API 在 ApiInterface
中:
@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);
这里是一个调用API的方法。它在 Map 中获取设备 ID 及其主体。此方法为 Map 中可用的每个设备 ID 调用 API。
public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {
List<Observable<ResponseBody>> requests = new ArrayList<>();
ArrayList<String> reqIdList = new ArrayList<>();
for (Map.Entry<String, String> entry : map.entrySet()) {
String deviceId = entry.getKey();
String jsonBodyStr = entry.getValue();
Gson gson = new Gson();
JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);
reqIdList.add(deviceId);
requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
}
Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {
@Override
public List<ResponseBody> apply(Object[] objects) throws Exception {
Log.e("onSubscribe", "apply : " + objects.length);
List<ResponseBody> dataResponses = new ArrayList<>();
for (Object o : objects) {
dataResponses.add((ResponseBody) o);
}
return dataResponses;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ResponseBody>>() {
@Override
public void accept(List<ResponseBody> responseBodies) throws Exception {
Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
for (int i = 0; i < responseBodies.size(); i++) {
Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
}
我想获得每个设备 ID 的响应(成功/失败)。意味着我需要响应以及调用 API 的 ID。
使用 zip 运算符,如果任何 API 失败,则在 accept(Throwable throwable)
方法中收到失败。如果任何 API 失败,我认为 zip 运算符不会调用下一个 API.
如何获得所有请求的响应(成功或失败)?
还需要一些东西来指示响应是针对哪个请求/设备 id(一些映射)来显示结果。
我可以使用其他运算符来代替 zip 吗?
有什么建议/帮助吗?
我有点生疏了java,所以我会用Kotlin写我的答案,你自己转换应该没有问题。
创建一个帮助程序 class,它将包括 ResponseBody
和 deviceId
:
data class IdentifiedResponseBody(
val deviceId: String,
val responseBody: ResponseBody?
)
然后:
// change the signature of your requests list to return IdentifiedResponseBody observables
val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
...
// your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map { responseBody ->
// map the added observable to return IdentifiedResponseBody
IdentifiedResponseBody(deviceId, responseBody)
}
.onErrorReturn { error ->
// return an item here instead of throwing error, so that the other observables will still execute
IdentifiedResponseBody(deviceId, null)
}
)
最后,使用merge
代替zip
:
Observable.merge(requests)
.subscribeOn(Schedulers.io())
// it's a question if you want to observe these on main thread, depends on context of your application
.subscribe(
{ identifiedResponse ->
// here you get both the deviceId and the responseBody
Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
if (responseBody == null || responseBody.hasError()) {
// request for this deviceId failed, handle it
}
},
{ error ->
Log.e("onSubscribe", "Throwable: " + error)
}
)
参见merge
:http://reactivex.io/documentation/operators/merge.html
参见zip
:http://reactivex.io/documentation/operators/zip.html
您应该看到其中的显着差异:zip
将您对映射函数定义的单个项目(即您的案例中的响应列表)的响应组合在一起,而 merge
单独发出所有响应,当时他们 returned。在此处 zip
的情况下,合并结果是 return 在所有请求完成的时刻(且仅);你可能不想要这种行为,就好像单个请求不会 return 响应一样,你根本不会得到任何响应。
更新
java 等价物应如下所示,但在尝试之前进行修改,因为我不确定是否正确转换了所有内容:
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map(new Function<ResponseBody, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
return new IdentifiedResponseBody(deviceId, responseBody);
}
})
.onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
return new IdentifiedResponseBody(deviceId, null);
}
})
);
Observable.merge(requests)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<IdentifiedResponseBody>() {
@Override
public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
// same logic as from kotlin part
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
更新 2
在你问的评论中:
Is there any way from which I can get final callback for all requests
completed
这就是使用 Observable
而不是 Single
/Completable
的问题,除非您明确关闭通道或抛出错误,否则它不会完成。在理想情况下,Observable
应该用于连续发出一些数据的流,例如到 Room DB 的开放通道,因为不知道数据库会更改多少次。我承认在你的情况下似乎很难应用 Observable
以外的东西。但是有一个解决方法:
Observable.merge(requests)
// emits only this much items, then close this channel
.take(requests.size.toLong())
// executed when the channel is closed or disposed
.doFinally {
// todo: finalCallback
}
.subscribeOn(Schedulers.io())
.subscribe(...)
代码又是Kotlin,改成java应该不难。请检查 Observable.take()
的作用:http://reactivex.io/documentation/operators/take.html
我有一个场景,我想为多个设备调用相同的 API 并在完成所有请求后显示结果。 我正在使用改造 2。 我对 RxJava 知之甚少。我认为 zip 运算符适合这个。所以实现如下。
API 在 ApiInterface
中:
@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);
这里是一个调用API的方法。它在 Map 中获取设备 ID 及其主体。此方法为 Map 中可用的每个设备 ID 调用 API。
public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {
List<Observable<ResponseBody>> requests = new ArrayList<>();
ArrayList<String> reqIdList = new ArrayList<>();
for (Map.Entry<String, String> entry : map.entrySet()) {
String deviceId = entry.getKey();
String jsonBodyStr = entry.getValue();
Gson gson = new Gson();
JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);
reqIdList.add(deviceId);
requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
}
Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {
@Override
public List<ResponseBody> apply(Object[] objects) throws Exception {
Log.e("onSubscribe", "apply : " + objects.length);
List<ResponseBody> dataResponses = new ArrayList<>();
for (Object o : objects) {
dataResponses.add((ResponseBody) o);
}
return dataResponses;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ResponseBody>>() {
@Override
public void accept(List<ResponseBody> responseBodies) throws Exception {
Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
for (int i = 0; i < responseBodies.size(); i++) {
Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
}
我想获得每个设备 ID 的响应(成功/失败)。意味着我需要响应以及调用 API 的 ID。
使用 zip 运算符,如果任何 API 失败,则在 accept(Throwable throwable)
方法中收到失败。如果任何 API 失败,我认为 zip 运算符不会调用下一个 API.
如何获得所有请求的响应(成功或失败)?
还需要一些东西来指示响应是针对哪个请求/设备 id(一些映射)来显示结果。
我可以使用其他运算符来代替 zip 吗?
有什么建议/帮助吗?
我有点生疏了java,所以我会用Kotlin写我的答案,你自己转换应该没有问题。
创建一个帮助程序 class,它将包括 ResponseBody
和 deviceId
:
data class IdentifiedResponseBody(
val deviceId: String,
val responseBody: ResponseBody?
)
然后:
// change the signature of your requests list to return IdentifiedResponseBody observables
val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
...
// your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map { responseBody ->
// map the added observable to return IdentifiedResponseBody
IdentifiedResponseBody(deviceId, responseBody)
}
.onErrorReturn { error ->
// return an item here instead of throwing error, so that the other observables will still execute
IdentifiedResponseBody(deviceId, null)
}
)
最后,使用merge
代替zip
:
Observable.merge(requests)
.subscribeOn(Schedulers.io())
// it's a question if you want to observe these on main thread, depends on context of your application
.subscribe(
{ identifiedResponse ->
// here you get both the deviceId and the responseBody
Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
if (responseBody == null || responseBody.hasError()) {
// request for this deviceId failed, handle it
}
},
{ error ->
Log.e("onSubscribe", "Throwable: " + error)
}
)
参见merge
:http://reactivex.io/documentation/operators/merge.html
参见zip
:http://reactivex.io/documentation/operators/zip.html
您应该看到其中的显着差异:zip
将您对映射函数定义的单个项目(即您的案例中的响应列表)的响应组合在一起,而 merge
单独发出所有响应,当时他们 returned。在此处 zip
的情况下,合并结果是 return 在所有请求完成的时刻(且仅);你可能不想要这种行为,就好像单个请求不会 return 响应一样,你根本不会得到任何响应。
更新
java 等价物应如下所示,但在尝试之前进行修改,因为我不确定是否正确转换了所有内容:
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map(new Function<ResponseBody, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
return new IdentifiedResponseBody(deviceId, responseBody);
}
})
.onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
return new IdentifiedResponseBody(deviceId, null);
}
})
);
Observable.merge(requests)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<IdentifiedResponseBody>() {
@Override
public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
// same logic as from kotlin part
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
更新 2
在你问的评论中:
Is there any way from which I can get final callback for all requests completed
这就是使用 Observable
而不是 Single
/Completable
的问题,除非您明确关闭通道或抛出错误,否则它不会完成。在理想情况下,Observable
应该用于连续发出一些数据的流,例如到 Room DB 的开放通道,因为不知道数据库会更改多少次。我承认在你的情况下似乎很难应用 Observable
以外的东西。但是有一个解决方法:
Observable.merge(requests)
// emits only this much items, then close this channel
.take(requests.size.toLong())
// executed when the channel is closed or disposed
.doFinally {
// todo: finalCallback
}
.subscribeOn(Schedulers.io())
.subscribe(...)
代码又是Kotlin,改成java应该不难。请检查 Observable.take()
的作用:http://reactivex.io/documentation/operators/take.html