使用自定义功能扩展 Observable

Extending Observable with custom functionality

我的情况:我有一个数据存储,我可以在其中获取可以使用 RxJS Observables 观察的模型。一般的类型签名是

const foo = new Model({id: 123}, dataStore);
foo.asObservable().subscribe((v) => /* do stuff with values of this model */ );

一般的观点是,像对待可观察对象一样处理数据有两种帮助:(a) 当您遇到冷缓存情况(例如浏览器的 localStorage 有一些 out-of-date 数据)但仍想显示时在获取实际数据时,以及 (b) 当您已经加载了正确的数据,但在本地或后端对其进行更改时需要向上传播。到目前为止,这工作得相当好。

next-level 问题是模型与其他模型有关系(例如,parents 有很多 children)。按照惯例,我可以做类似

的事情
foo.asObservable().subscribe(v => console.log(v.relationships.children))

和(此处忽略空错误),我最初得到 [1, 2, 3],然后当 4 添加到 parent-child-relationship 时得到 [1, 2, 3, 4]。到目前为止和我在一起吗?

问题是我经常想访问这些 children,不是作为索引,而是作为它们本身的可观察模型(所以我可以显示用户社区所有成员的名字,例如,社区和所有成员都是数据模型)。我目前在我的控制器代码中使用大量样板来执行此操作,涉及对 .combineLatest.

的大量调用

我想要做的是为这种类型的 objects 定义一个自定义运算符,这样我就可以将它们全部放在一起。理想情况下它看起来像:

foo.asObservable().inflateRelationship('members').subscribe(
  (v) => // v === [{name: 'steve'}, {name: 'gertude'} ...] etc
);

我实际上已经部分工作了,但问题是启动了实际的链。我正在关注 instructions for extending Observable,创建一个实现 lift 的新 CustomObservable class,但我的问题是我不能在这里使用静态 Observable 方法,例如 Observable.merge( ), 在 Model.asObservable.

中生成我的初始观察值

我的问题来了:

const preload$ = Observable... 
  // create the "load from cache and backend observable"
const update$ = Observable ... 
  // create the "update after load when the storage updates observable" 

return new CustomObservable(context).merge(preload$, update$);

最后一行失败了。我想 return 这个 CustomObservable 由两个常规可观察流上的合并运算符制成。我需要将上下文添加到那里的构造函数,因为该上下文包含对实际膨胀 child 模型所需的数据存储的引用(没有它,id 数组流有点毫无意义)。

这就是我的具体问题:我创建了一个 Observable 运算符,我想将其作为 class 添加到 CustomObservable,因此我可以像平常一样使用下游运算符,但我似乎不能正确启动整个链条。

任何指针,甚至是正确(和non-trivially)扩展 Observable class 的现有项目都将受到欢迎。我尝试深入研究源代码,但我什至无法弄清楚那部分(看起来 Observable class statics 被添加到其他地方,乍一看是 super-unclear ,因为没有定义在 Observable class itself).

这就是其中一种 "write a really long question to stack overflow and figure out the answer shortly after you've posted it" 情况,但我想我会写下答案只是为了后代。

假设您已按照说明对 Operator 进行子类化,您要做的是

asObservable() {
  // do a bunch of stuff making different things
  return Observable.merge(one$, two$)
    .let(obs => new CustomObservable(context, obs);

然后在 CustomObservable 中你有

class CustomObservable extends Observable {
  constructor(context, source) {
    super();
    this.source = source;
  }
  customOperator() {}
  lift(operator) {
    const obs = new CustomObservable(context, this);
    obs.operator = operator;
    return obs;
  }
}

这让我可以做到

Model.asObservable()
.filter() // normal RxJs operator here
.customOperator() // yay
.map() // back to other RxJs operators
.subscribe(v => console.log(v)) // or whatever

所以,是的。现在我的 angular 模型可以看起来更空闲了。