RxJava:如何压缩两个 Observable,然后合并它们并最终合并所有结果
RxJava: How to .zip two Observable, then .merge them and eventually .reduce to aggregate all results
我有以下代码:
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1);
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2);
return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> {
depositObj1.putNumber("seat_index", data);
depositObj2.putNumber("seat_index", data);
return rx.Observable.merge(
rx.Observable.just(depositObj1),
rx.Observable.just(depositObj2));
})
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> {
System.out.println(results);
});
}
此代码使用 .zip 发送到 observables,然后它添加一个 'seat_index' 属性 并调用 .merge 以使用 .reduce 因此最终所有结果将聚合到一个 ArrayList .
这段代码有一个问题:当 .reduce 处理它的输入时,它把它作为 Observable 而不是 GmObject ...什么函数可以 'extract' GmObject 从它的 Observable 包装?
这样使用 rxJava 有意义吗?或者有更好的技术?
谢谢!
zip
运算符将 lambda 作为第三个参数。这个 lambda 是一个 2 args 函数,它 return 一个由 args 组成的对象。而不是组合结果的 Observable
(但是,当然,对象可以是 Observable
,但在您的情况下这不是您想要的)。
所以在你的 zip
调用之后,你将得到一个 Observable<Observable<GmObject>>
但你期望一个 Observable<GmObject>
.
我认为 zip
运算符不是您要查找的运算符。
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data));
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data));
return rx.Observable.merge(depositObs1, depositObs2);
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> System.out.println(results));
}
我有以下代码:
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1);
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2);
return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> {
depositObj1.putNumber("seat_index", data);
depositObj2.putNumber("seat_index", data);
return rx.Observable.merge(
rx.Observable.just(depositObj1),
rx.Observable.just(depositObj2));
})
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> {
System.out.println(results);
});
}
此代码使用 .zip 发送到 observables,然后它添加一个 'seat_index' 属性 并调用 .merge 以使用 .reduce 因此最终所有结果将聚合到一个 ArrayList .
这段代码有一个问题:当 .reduce 处理它的输入时,它把它作为 Observable 而不是 GmObject ...什么函数可以 'extract' GmObject 从它的 Observable 包装?
这样使用 rxJava 有意义吗?或者有更好的技术?
谢谢!
zip
运算符将 lambda 作为第三个参数。这个 lambda 是一个 2 args 函数,它 return 一个由 args 组成的对象。而不是组合结果的 Observable
(但是,当然,对象可以是 Observable
,但在您的情况下这不是您想要的)。
所以在你的 zip
调用之后,你将得到一个 Observable<Observable<GmObject>>
但你期望一个 Observable<GmObject>
.
我认为 zip
运算符不是您要查找的运算符。
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data));
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data));
return rx.Observable.merge(depositObs1, depositObs2);
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> System.out.println(results));
}