combineLatest 与可观察变量的可变计数

combineLatest with variable count of observables

我想要 combineLatest 功能,但要观察变量的可变数量。

类似于:

// init combineLatest of three observables
[1, 2, 3]
// first observable produced new value "2"
[2, 2, 3]
// third observable ended
[2, 2]
// first observable produced new value "1"
[1, 2]
// new observable added
[2, 2, 4]

在 RxJS 中可以吗?

如果我对问题的理解正确,对于看起来无辜的东西,解决方案是相当棘手的。

我试着一步一步解释一个可能的解决方案。

首先我们需要了解我们需要管理 3 个不同的事件:

  1. 一个 Observable 完成的事实
  2. 事实上,一个 Observable 被添加到给定 combineLatest
  3. 的数组中
  4. 必须将一个新的 Observable 数组传递给 combineLatest 的事实,要么是因为我们正处于处理的开始(即使用初始数组),要么是因为我们添加了一个新的 Observable 或因为一个 Observable 已经完成

我们需要认识到的第二件事是我们需要将我们传递给 combineLatest 的 Observables 数组存储在一个变量中,否则我们无法从那里添加或删除 Obaservables。

一旦这些事情都清楚了,我们就可以构建一个函数形式的解决方案,returns 2 件事:

  1. 我们想要订阅的 Observable 应该具有我们正在寻找的行为
  2. 一个主题,我们可以用来传达我们想要向 combineLatest 函数添加一个新的 Observable

我们需要认识到的最后一点是,任何时候我们更改 Observable 列表,无论是因为我们添加还是因为我们删除 Observable(因为它已完成),我们都需要 运行 combineLatest 函数与新的 Observables 列表。

现在所有这些都已经澄清了,这是 returns 一个 Observable 的函数代码,它的行为与描述的一样

function dynamicCombineLatest(startingObservables: Observable<any>[]) {
  // this is the variable holding the array of Observables
  let observables = startingObservables;

  // this is the array that contains the list of Observables which have been, potentially, transformed to emit
  // immediately the last value emitted - this happens when a new Observable is added to the array
  let observablesPotentiallyWithLastValueImmediatelyEmitted =
    startingObservables;

  // this is the variable holding the array of values last notified by each Observable
  // we will use it when we need to add a new Observable to the list
  const lastValues = [];

  // this are the Subjects used to notify the 3 different types of events
  const start = new BehaviorSubject<Observable<any>[]>(observables);
  const add = new Subject<Observable<any>>();
  const remove = new Subject<Observable<any>>();

  let skipFirst = false;

  // this is the chain of operations which must happen when a new Observable is added
  const addToObservables = add.pipe(
    tap({
      next: (obs) => {
        console.log("add");
        // we need to make sure that the Observables in the list will immediately start to emit
        // the last value they emitted. In this way we are sure that, as soon as the new added Observable emits somthing,
        // the last value emitted by the previous Observables will be considered
        observablesPotentiallyWithLastValueImmediatelyEmitted = observables.map(
          (o, i) => {
            return startWith(lastValues[i])(o);
          }
        );
        // the new Observable is added to the list
        observables.push(obs);
        observablesPotentiallyWithLastValueImmediatelyEmitted.push(obs);
      },
    })
  );
  // this is the chain of operations which must happen when an Observable is removed
  const removeFromObservables = remove.pipe(
    tap({
      next: (obs) => {
        const index =
          observablesPotentiallyWithLastValueImmediatelyEmitted.indexOf(obs);
        console.log("remove");
        // we simply remove the Observable from the list and it "last value"
        observablesPotentiallyWithLastValueImmediatelyEmitted.splice(index, 1);
        observables.splice(index, 1);
        lastValues.splice(index, 1);

        // we make sure that the Observables in the list will immediately start to emit with the last value they emitted
        observablesPotentiallyWithLastValueImmediatelyEmitted = observables.map(
          (o, i) => {
            return lastValues[i] ? startWith(lastValues[i])(o) : o;
          }
        );
        // we set that the first value of the new combineLatest Observable will be skipped
        skipFirst = true;
      },
    })
  );

  // here we merge the 2 chains of operations so that both add and remove logic will be executed
  // when the relative Subjects emit
  merge(addToObservables, removeFromObservables).subscribe({
    next: () => {
      console.log("new start");
      // we notify that a change in the Observable list has occurred and therefore we need to unsubscribe the previous "combineLatest"
      // and subscribe to the new one we are going to build
      start.next(observablesPotentiallyWithLastValueImmediatelyEmitted);
    },
  });

  // this is where we switch to a new Observable, result of the "combineLatest" operation,
  // any time the start Subject emits a new Observable list
  const dynamicObservables = start.pipe(
    switchMap((_observables) => {
      const _observablesSavingLastValueAndSignallingRemove = _observables.map(
        (o, i) =>
          o.pipe(
            tap({
              next: (v) => {
                // here we save the last value emitted by each Observable
                lastValues[i] = v;
              },
              complete: () => {
                // here we notify that the Observable has completed and we need to remove it from the list
                remove.next(o);
              },
            })
          )
      );
      console.log("add or remove");
      // eventually this is the Observable created by combineLatest with the expected array of Observables
      const _combineLatest = combineLatest(
        _observablesSavingLastValueAndSignallingRemove
      );
      const ret = skipFirst ? _combineLatest.pipe(skip(1)) : _combineLatest;
      skipFirst = false;
      return ret;
    })
  );

  // here we return the Observable which will be subscribed to and the add Subject to be used to add new Observables
  return { dynamicObservables, add };
}

你可以看看this stackblitz for an example.

根据键缓冲和组合

这里有一个与您的要求略有不同的版本。它的工作方式与 mergeAll 类似,只是它保留了一个缓冲区并为到目前为止已发出的任何可观察值发出最新消息。

这里的变体是您需要为要附加的值提供字符串键。如果您愿意,您应该能够看到如何将其转换为数组索引。

我没有对数组执行此操作的原因是没有太多未定义的行为。例如,如果第一个 observable 完成并且第二个 observable 发出,则您的元素都是不透明的 re-ordered.

使用键 returns 将控制权交还给调用者,如果他们不关心 indices/labels 的数据,他们可以只使用 Object.keys()

你在这里:


interface LabeledObservable<T> {
  label: string,
  stream: Observable<T>
}

interface CombinedLatest<T> {
  [key:string]: T
}

function combineLatestAll<T>(): 
  OperatorFunction<
    LabeledObservable<T>, 
    CombinedLatest<T>
  > 
{
  return source$ => defer(() => {

    const buffer = {};

    return source$.pipe(
      mergeMap(({label, stream}) => stream.pipe(
        map(v => {
          buffer[label] = v;
          return {...buffer};
        }),
        finalize(() => {
          delete buffer[label];
        })
      ))
    );

  });
}

新观察对象

如果您喜欢主题的想法,您可以使用它来将新的可观察对象注入到 combineLatest 运算符中,这仍然允许这样做。唯一需要的改变是您必须为您的可观察对象提供唯一的标签。如果你不关心标签,你可以使用任何 ID 生成器模式(比如递增全局 ID 计数器或其他东西)。

const startingObservables: Observable<any>[] = /*some observables */;
const add = new Subject<LabeledObservable<any>>();

add.pipe(
  combineLatestAll()
).subscribe(console.log);

startingObservables.forEach((stream,i) => {
  add.next({label: "" + i, stream});
});