如何让 RxJS 可观察管道访问原始可观察发射和管道之前的发射?

How can I give an RxJS observable pipe access to the original observable's emission AND the pipe's previous emission?

我有一个 RxJS Observable,它向底层数据结构发出一系列变化——具体来说,snapshotChanges() from an AngularFirestoreCollection

我想做的是使用 Immer 来维护一个不可变的结构,这样未更改的数据在结构上与“新”数组共享。

我无法解决的是如何 pipe() 关闭 snapshotChanges() observable 以便管道可以访问先前发出的不可变数据(或首次默认)除了最新的snapshotChanges()输出。

在代码中,我基本上已经有了:

const docToObject = (doc) => { /* change document to fresh plain object every time */ };
const mappedData$ = snapshotChanges().pipe(
    map(changes => changes.map(change => docToObject(change.payload.doc)),
    tap(array => console.log('mutable array:', array)),
);

我基本上是在寻找这样的东西,但我不知道 XXX(...) 应该是什么:

const newImmutableObject = (changes, old) => {
  // new immutable structure from old one + changes, structurally sharing as much as
  // possible
};
const mappedData$ = snapshotChanges().pipe(

// ==================================================================================
    XXX(...), // missing ingredient to combine snapshotChanges and previously emitted
              // value, or default to []
// ==================================================================================

    map(([snapshotChanges, prevImmutableOutput]) => newImmutableOutput(...)),
    tap(array => console.log('IMMUTABLE ARRAY with shared structure:', array)),
);

我觉得 the expand operator 接近我需要的,但它似乎只在后续运行中传递先前发出的值,而我还需要新发出的 snapshotChanges.

给定一个 RxJS Observable 管道,我怎样才能对这个 Observable 的发射进行操作,同时还能访问管道之前的发射?

根据您的要求,我建议使用 scan 运算符,它可以跟踪所有以前的状态和新状态。

const newImmutableObject = (changes, old) => {
  // new immutable structure from old one + changes, structurally sharing as much as
  // possible
};
 const mappedData$ = snapshotChanges().pipe(
 scan((acc, current) => [...acc, current], []), //<-- scan is used here
 map(([snapshotChanges, prevImmutableOutput]) => newImmutableOutput(...)),
    tap(array => console.log('IMMUTABLE ARRAY with shared structure:', array)),
);