根据 Id 组合 Rx 流

Combine Rx Streams based on Id

我有 2 个流发射对象,一个是 A 类型,另一个是 B 类型。它们都包含我们称之为 id.

的字段
Observable<A> observableA = // by calling service A
Observable<B> observableB = // by calling service B

我想将这两个流组合成一个对象流 C,它将包含来自两个对象的数据。

Observable<C> observableC = //Some fuction to merge observableA and observableB

对象从 observableA 和 observableB 发出的顺序可以不同。例如,observableA 可以先发出 id 为 1 的对象,但 observableB 可能会发出 id 为 2 的对象。

我发现的一种方法是在发出时将元素放在 2 个不同的地图(MapA 和 MapB)中,并在两个地图中都存在值时发出 C

Map<String,A> mapA = new HashMap<>();
Map<String,B> mapB = new HashMap<>();

Observable<C> observableC = Observable.merge(observableA,observableB).map(obj->{
 if(obj instanceof A) {
  if(mapB.containsKey(((A)obj).getId()){
      //create object C
  } else {
      mapA.put(((A)obj).getId());
  }
 }
 if(obj instanceof B){
   // same checks as above
 })

我想知道是否有任何更简洁的方法来执行此操作(可能使用一些 rx java 运算符)。

您可以为此使用 groupBy 运算符:

    Observable.merge(s1, s2)
        .groupBy(obj -> obj.getId())
        .flatMapSingle(grp -> grp.toList()
            .map(l -> /* create object C here */)
        )
        .subscribe();

groupBy 会将您的源 Observable 实例分成单独的 GroupedObservable 个实例,每组一个实例,由您传递给的键选择器函数返回的值决定groupBy。在这种情况下,对于 getId 的所有调用返回的每个唯一 ID,您都会得到一个 GroupedObservable。从那里,我们将共享 ID 的所有对象组合到一个列表中。届时,您可以自由使用所有这些对象来创建 C.

的实例