RxJS Observable:当对象的特定部分发生变化时发出

RxJS Observable: Emit when specific part of the object changes

我有一个包含子对象和数组等的复杂对象。

我可以轻松地从这个对象创建一个 Subject/BehaviourSubject 或 Observable,这样我就可以 "next"(发送)新状态给订阅者。

[例如:let appEnv$ = new Rx.BehaviorSubject<IWebSocketAppEnv>(appEnv);]

但是我不希望每次对象更改时都通知我的所有订阅者。例如,对于我的一位订户,我只想在该对象的数组元素发生更改时收到通知。

其实我想要的是redux已经在做的东西。在 redux 中,我可以订阅商店但只选择一个子元素。

我想为我的后端 websocket 服务器应用程序实现相同的基础结构。

我如何使用 RxJS 实现它?

我们可以根据条件部分订阅源可观察对象(在您的情况下是复杂对象)来解决它。

Rxjs takeWhile 有助于做到这一点。 只有满足一定的条件才能进行订阅,然后处理发出的输出。

https://www.learnrxjs.io/operators/filtering/takewhile.html

使用 distinctUntilChanged 运算符。 (docs)

我们假设对象的结构是:

{
  articleId: 0,
  comments: [ 'lorem', 'ipsum' ]
}

我们想和 firebase 的家伙一样酷,所以当更新的对象有不同的数组时,我们会实时更新评论。

我将 RxJS5 用于可观察对象并使用 lodash 来比较数组,因为运算符的默认行为不会按照我们希望的方式进行比较。

// Our BehaviorSubject which emits new object.
// For those who don't know: it'll emit the latest emitted value when subscribing to it.
const object$ = new Rx.BehaviorSubject({
  articleId: 0,
  comments: [],
}); 

// If you want specific parts of your application 
// to react only when a specific part of the object has changed, you 
// have to create another observable using 'map' and 
// 'distinctUntilChanged' operator and subscribe to it.
const comments$ = object$
  .map(object => object.comments) // we want to emit comments only
  .distinctUntilChanged((a, b) => _.isEqual(a, b)); // emit only when currently emitted value is different than previous one.

comments$.subscribe(v => console.log(v)); // log fresh comments

object$.next({
  articleId: 1,
  comments: [],
});

object$.next({
  articleId: 1,
  comments: ['lorem', 'ipsum'],
});

object$.next({
  articleId: 2,
  comments: ['lorem', 'ipsum'],
});

object$.next({
  articleId: 2,
  comments: ['lorem', 'ipsum', 'dolor', 'sit', 'amet'],
});

作用:

[]
["lorem", "ipsum"]
["lorem", "ipsum", "dolor", "sit", "amet"]

没有distinctUntilChanged运算符的效果:

[]
[]
["lorem", "ipsum"]
["lorem", "ipsum"]
["lorem", "ipsum", "dolor", "sit", "amet"]

JSFiddle