根据 Id 组合 Rx 流
Combine Rx Streams based on Id
我有 2 个流发射对象,一个是 A
类型,另一个是 B
类型。它们都包含我们称之为 id
.
的字段
Observable<A> observableA = // by calling service A
Observable<B> observableB = // by calling service B
我想将这两个流组合成一个对象流 C
,它将包含来自两个对象的数据。
Observable<C> observableC = //Some fuction to merge observableA and observableB
对象从 observableA 和 observableB 发出的顺序可以不同。例如,observableA 可以先发出 id 为 1 的对象,但 observableB 可能会发出 id 为 2 的对象。
我发现的一种方法是在发出时将元素放在 2 个不同的地图(MapA 和 MapB)中,并在两个地图中都存在值时发出 C
。
Map<String,A> mapA = new HashMap<>();
Map<String,B> mapB = new HashMap<>();
Observable<C> observableC = Observable.merge(observableA,observableB).map(obj->{
if(obj instanceof A) {
if(mapB.containsKey(((A)obj).getId()){
//create object C
} else {
mapA.put(((A)obj).getId());
}
}
if(obj instanceof B){
// same checks as above
})
我想知道是否有任何更简洁的方法来执行此操作(可能使用一些 rx java 运算符)。
您可以为此使用 groupBy
运算符:
Observable.merge(s1, s2)
.groupBy(obj -> obj.getId())
.flatMapSingle(grp -> grp.toList()
.map(l -> /* create object C here */)
)
.subscribe();
groupBy
会将您的源 Observable
实例分成单独的 GroupedObservable
个实例,每组一个实例,由您传递给的键选择器函数返回的值决定groupBy
。在这种情况下,对于 getId
的所有调用返回的每个唯一 ID,您都会得到一个 GroupedObservable
。从那里,我们将共享 ID 的所有对象组合到一个列表中。届时,您可以自由使用所有这些对象来创建 C
.
的实例
我有 2 个流发射对象,一个是 A
类型,另一个是 B
类型。它们都包含我们称之为 id
.
Observable<A> observableA = // by calling service A
Observable<B> observableB = // by calling service B
我想将这两个流组合成一个对象流 C
,它将包含来自两个对象的数据。
Observable<C> observableC = //Some fuction to merge observableA and observableB
对象从 observableA 和 observableB 发出的顺序可以不同。例如,observableA 可以先发出 id 为 1 的对象,但 observableB 可能会发出 id 为 2 的对象。
我发现的一种方法是在发出时将元素放在 2 个不同的地图(MapA 和 MapB)中,并在两个地图中都存在值时发出 C
。
Map<String,A> mapA = new HashMap<>();
Map<String,B> mapB = new HashMap<>();
Observable<C> observableC = Observable.merge(observableA,observableB).map(obj->{
if(obj instanceof A) {
if(mapB.containsKey(((A)obj).getId()){
//create object C
} else {
mapA.put(((A)obj).getId());
}
}
if(obj instanceof B){
// same checks as above
})
我想知道是否有任何更简洁的方法来执行此操作(可能使用一些 rx java 运算符)。
您可以为此使用 groupBy
运算符:
Observable.merge(s1, s2)
.groupBy(obj -> obj.getId())
.flatMapSingle(grp -> grp.toList()
.map(l -> /* create object C here */)
)
.subscribe();
groupBy
会将您的源 Observable
实例分成单独的 GroupedObservable
个实例,每组一个实例,由您传递给的键选择器函数返回的值决定groupBy
。在这种情况下,对于 getId
的所有调用返回的每个唯一 ID,您都会得到一个 GroupedObservable
。从那里,我们将共享 ID 的所有对象组合到一个列表中。届时,您可以自由使用所有这些对象来创建 C
.