使用 Rx java Observable 一次进行多个异步调用(fire and forget calls)
Make multiple asynchronous calls(fire and forget calls) at once using Rx java Observable
我有一个下游 api 调用列表(大约 10 个),我需要立即异步调用它们。到目前为止,我一直在使用可调用文件
List<RequestContextPreservingCallable <FutureResponse>> callables
我会将 api 调用添加到此列表,并在末尾使用 executeAsyncNoReturnRequestContextPreservingCallables 提交它。
使用 Rx java Observables 我该怎么做?
List<RequestContextPreservingCallable<FutureResponse>> callables = new
ArrayList<RequestContextPreservingCallable<FutureResponse>>();
callables.add(apiOneConnector.CallToApiOne(name));
callables.add(apiTwoConnector.CallToApiTWO(sessionId));
....
//execute all the calls
executeAsyncNoReturnRequestContextPreservingCallables(callables);
您可以使用 zip
运算符。 zip
operator 可以获取多个 observables 并同时执行它们,并在所有结果到达后继续执行。
然后您可以将这些结果转换为您需要的形式并传递到下一个级别。
按照你的例子。假设您有多个 API 调用来获取名称和会话等,如下所示
Observable.zip(getNameRequest(), getSessionIdRequest(), new BiFunction<String, String, Object>() {
@Override
public Object apply(String name, String sessionId) throws Exception {
// here you will get all the results once everything is completed. you can then take these
// results and transform into another object and returnm from here. I decided to transform the results into an Object[]
// the retuen type of this apply funtion is generic, so you can choose what to return
return new Object[]{name, sessionId};
}
})
.subscribeOn(Schedulers.io()) // will start this entire chain in an IO thread
.observeOn(AndroidSchedulers.mainThread()) // observeOn will filp the thread to the given one , so that the downstream will be executed in the specified thread. here I'm switching to main at this point onwards
.subscribeWith(new DisposableObserver<Object>() {
@Override
public void onNext(Object finalResult) {
// here you will get the final result with all the api results
}
@Override
public void onError(Throwable e) {
// any error during the entire process will be triggered here
}
@Override
public void onComplete() {
//will be called once the whole chain is completed and terminated
}
});
您甚至可以将可观察值列表传递给 zip
,如下所示
List<Observable<String>> requests = new ArrayList<>();
requests.add(getNameRequest());
requests.add(getSessionIdRequest());
Observable.zip(requests, new Function<Object[], Object[]>() {
@Override
public Object[] apply(Object[] objects) throws Exception {
return new Object[]{objects[0], objects[1]};
}
}).subscribeWith(new DisposableObserver<Object[]>() {
@Override
public void onNext(Object[] objects) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})
我有一个下游 api 调用列表(大约 10 个),我需要立即异步调用它们。到目前为止,我一直在使用可调用文件
List<RequestContextPreservingCallable <FutureResponse>> callables
我会将 api 调用添加到此列表,并在末尾使用 executeAsyncNoReturnRequestContextPreservingCallables 提交它。
使用 Rx java Observables 我该怎么做?
List<RequestContextPreservingCallable<FutureResponse>> callables = new
ArrayList<RequestContextPreservingCallable<FutureResponse>>();
callables.add(apiOneConnector.CallToApiOne(name));
callables.add(apiTwoConnector.CallToApiTWO(sessionId));
....
//execute all the calls
executeAsyncNoReturnRequestContextPreservingCallables(callables);
您可以使用 zip
运算符。 zip
operator 可以获取多个 observables 并同时执行它们,并在所有结果到达后继续执行。
然后您可以将这些结果转换为您需要的形式并传递到下一个级别。
按照你的例子。假设您有多个 API 调用来获取名称和会话等,如下所示
Observable.zip(getNameRequest(), getSessionIdRequest(), new BiFunction<String, String, Object>() {
@Override
public Object apply(String name, String sessionId) throws Exception {
// here you will get all the results once everything is completed. you can then take these
// results and transform into another object and returnm from here. I decided to transform the results into an Object[]
// the retuen type of this apply funtion is generic, so you can choose what to return
return new Object[]{name, sessionId};
}
})
.subscribeOn(Schedulers.io()) // will start this entire chain in an IO thread
.observeOn(AndroidSchedulers.mainThread()) // observeOn will filp the thread to the given one , so that the downstream will be executed in the specified thread. here I'm switching to main at this point onwards
.subscribeWith(new DisposableObserver<Object>() {
@Override
public void onNext(Object finalResult) {
// here you will get the final result with all the api results
}
@Override
public void onError(Throwable e) {
// any error during the entire process will be triggered here
}
@Override
public void onComplete() {
//will be called once the whole chain is completed and terminated
}
});
您甚至可以将可观察值列表传递给 zip
,如下所示
List<Observable<String>> requests = new ArrayList<>();
requests.add(getNameRequest());
requests.add(getSessionIdRequest());
Observable.zip(requests, new Function<Object[], Object[]>() {
@Override
public Object[] apply(Object[] objects) throws Exception {
return new Object[]{objects[0], objects[1]};
}
}).subscribeWith(new DisposableObserver<Object[]>() {
@Override
public void onNext(Object[] objects) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})