使用像 merge 这样的 RxJs 运算符,但在结果中跟踪源 observables?

Use RxJs operator like merge but keep track of source observables in the result?

我想像使用“合并”运算符一样合并可观察对象,但我仍然希望能够知道发出了哪个输入可观察对象,有没有办法做到这一点?

例如:

private result$ = merge(this.obs1$, this.obs2$).pipe(
    scan((result, change) => index + change, 0),
    shareReplay(1)
  );

只要任何输入可观察量发出,来自 obs1 和 obs2 的两个值都将进入扫描函数中的“更改”变量,但是如果我可以访问投影函数,我可以在其中标记来自输入的值具有不同名称的 observables 然后我可以在下面的扫描函数中做不同的事情,具体取决于发射的输入 observable。其他运算符(如 CombineLatest 或 ForkJoin)似乎不适用于此处,因为它们需要完成或从所有输入可观察量中发出。

如果您需要跟踪发出了哪个输入可观察对象,那么您可能需要将元数据添加到您的源可观察对象中。在不知道如何使用 result$ 的上下文的情况下,这是给定信息的最佳解决方案。

我建议为您需要跟踪的每个可观察对象添加一个 id 属性。然后,您可以根据 ID 在扫描运算符中使用一些策略。

下面是一个简单示例,对每个可观察源使用 id。在 scan 运算符中,您将看到我的策略如何根据 ID 发生变化。

import { interval, merge, of } from "rxjs";
import { map, scan, shareReplay } from "rxjs/operators";

const obs1$ = interval(1000).pipe(map(i => ({ i, id: "obs1" })));
const obs2$ = interval(3000).pipe(map(i => ({ i, id: "obs2" })));

let index = 0;

const result$ = merge(obs1$, obs2$).pipe(
  scan((result, change) => {
    if (change.id === "obs1") {
      return index + change.i;
    }

    if (change.id === "obs2") {
      return index + change.i * 2;
    }
  }, 0),
  shareReplay(1)
);

result$.subscribe(console.log);

https://stackblitz.com/edit/rxjs-as5ket

@react-rxjs/utilsa util named mergeWithKey 可以像这样使用:

import { Subject } from "rxjs"
import { scan, startWith } from 'rxjs/operators'
import { mergeWithKey } from '@react-rxjs/utils'

const inc$ = new Subject()
const dec$ = new Subject()
const resetTo$ = new Subject<number>()

const counter$ = mergeWithKey({
  inc$,
  dec$,
  resetTo$,
}).pipe(
  scan((acc, current) => {
    switch (current.type) {
      case "inc$":
        return acc + 1
      case "dec$":
        return acc - 1
      case "resetTo$":
        return current.payload
      default:
        return acc
    }
  }, 0),
  startWith(0),
)

实现非常简单:


import { merge, Observable, ObservableInput, from, SchedulerLike } from "rxjs"
import { map } from "rxjs/operators"

/**
 * Emits the values from all the streams of the provided object, in a result
 * which provides the key of the stream of that emission.
 *
 * @param input object of streams
 */
export const mergeWithKey: <
  O extends { [P in keyof any]: ObservableInput<any> },
  OT extends {
    [K in keyof O]: O[K] extends ObservableInput<infer V>
      ? { type: K; payload: V }
      : unknown
  }
>(
  x: O,
  concurrent?: number,
  scheduler?: SchedulerLike,
) => Observable<OT[keyof O]> = (input, ...optionalArgs) =>
  merge(
    ...(Object.entries(input)
      .map(
        ([type, stream]) =>
          from(stream).pipe(
            map((payload) => ({ type, payload } as any)),
          ) as any,
      )
      .concat(optionalArgs) as any[]),
  )

这是您需要的吗?