等待多个 observable 完成 return 个不同数量的元素
Waiting for multiple observable to complete that return different number of elements
场景:我有一个 customerID 字符串,用于查询多个不同的后端系统:日历、帮助台、ERP、CRM 等。我想编制一个报告。
所以我大致有 (psydocode):
Result myResult = new Result();
Observable<Cal> cal = Calbackend.get(customerid);
cal.subscribe(calentry -> myResult.addCal(calentry));
Observable<Erp> erp = ERPbackend.get(customerid);
erp.subscribe(erpentry -> myResult.addErp(erpentry));
Observable<Help> help = Helpbackend.get(customerid);
help.subscribe(helpentry -> myResult.addHelp(helpentry));
Observable<Crm> crm = CRMbackend.get(customerid);
crm.subscribe(crmentry -> myResult.addCrm(crmentry));
// Magic here?
return result;
我想到的方法是:用defer()
来阻止启动,然后为每个额外订阅count()
。然后我可以压缩 count 元素,因为它们每个只会发出一个项目(而其他元素将有不同数量的事件)。但是,如果 myResult.add
的执行速度比 count()
慢,这可能会导致数据丢失。
我想到的另一个选择是为每个订阅设置一个布尔标志数组,并检查每个完成(和错误)事件(如果所有事件都已完成)并执行回调或对该事件使用阻塞.
我看了看 and here,但这些示例处理的是常数或数据类型。
或者有更好/推荐的方法吗?
运算符 toList
可以像这样与 zip
一起使用:
Observable<List<Cal>> cal = Calbackend.get(customerid).toList();
Observable<List<Erp>> erp = ERPbackend.get(customerid).toList();
Observable<List<Help>> help = Helpbackend.get(customerid).toList();
Observable<List<Crm>> crm = CRMbackend.get(customerid).toList();
Observable.zip(cal, erp, help, crm,
new Func4<List<Cal>, List<Erp>, List<Help>, List<Crm>, Result>() {
@Override
public Result call(List<Cal> cals, List<Erp> erps, List<Help> helps, List<Crm> crms) {
Result myResult = new Result();
// add all cals, erps, helps and crms to result
return myResult;
}
})
.subscribe(new Subscriber<Result>() {
@Override
public void onNext(Result result) {
// do something with the result
}
...
});
解释: 顾名思义,toList
运算符创建一个由源可观察对象发射的项目列表(该列表仅发射一次,当source observable 完成),然后 zip
用于组合 observable 的结果。
编辑: 如果这些 Observable 可能会发出错误,您可以使用 onErrorReturn
来保持正常流程:
Observable<List<Cal>> cal = Calbackend.get(customerid)
.onErrorReturn(new Func1<Throwable, Cal>() {
@Override
public Cal call(Throwable throwable) {
// Return something in the error case
return null;
}
})
.toList();
场景:我有一个 customerID 字符串,用于查询多个不同的后端系统:日历、帮助台、ERP、CRM 等。我想编制一个报告。 所以我大致有 (psydocode):
Result myResult = new Result();
Observable<Cal> cal = Calbackend.get(customerid);
cal.subscribe(calentry -> myResult.addCal(calentry));
Observable<Erp> erp = ERPbackend.get(customerid);
erp.subscribe(erpentry -> myResult.addErp(erpentry));
Observable<Help> help = Helpbackend.get(customerid);
help.subscribe(helpentry -> myResult.addHelp(helpentry));
Observable<Crm> crm = CRMbackend.get(customerid);
crm.subscribe(crmentry -> myResult.addCrm(crmentry));
// Magic here?
return result;
我想到的方法是:用defer()
来阻止启动,然后为每个额外订阅count()
。然后我可以压缩 count 元素,因为它们每个只会发出一个项目(而其他元素将有不同数量的事件)。但是,如果 myResult.add
的执行速度比 count()
慢,这可能会导致数据丢失。
我想到的另一个选择是为每个订阅设置一个布尔标志数组,并检查每个完成(和错误)事件(如果所有事件都已完成)并执行回调或对该事件使用阻塞.
我看了看
或者有更好/推荐的方法吗?
运算符 toList
可以像这样与 zip
一起使用:
Observable<List<Cal>> cal = Calbackend.get(customerid).toList();
Observable<List<Erp>> erp = ERPbackend.get(customerid).toList();
Observable<List<Help>> help = Helpbackend.get(customerid).toList();
Observable<List<Crm>> crm = CRMbackend.get(customerid).toList();
Observable.zip(cal, erp, help, crm,
new Func4<List<Cal>, List<Erp>, List<Help>, List<Crm>, Result>() {
@Override
public Result call(List<Cal> cals, List<Erp> erps, List<Help> helps, List<Crm> crms) {
Result myResult = new Result();
// add all cals, erps, helps and crms to result
return myResult;
}
})
.subscribe(new Subscriber<Result>() {
@Override
public void onNext(Result result) {
// do something with the result
}
...
});
解释: 顾名思义,toList
运算符创建一个由源可观察对象发射的项目列表(该列表仅发射一次,当source observable 完成),然后 zip
用于组合 observable 的结果。
编辑: 如果这些 Observable 可能会发出错误,您可以使用 onErrorReturn
来保持正常流程:
Observable<List<Cal>> cal = Calbackend.get(customerid)
.onErrorReturn(new Func1<Throwable, Cal>() {
@Override
public Cal call(Throwable throwable) {
// Return something in the error case
return null;
}
})
.toList();