Rx(JS):如何更新 flatMap 流中的订阅

Rx (JS): how to update subscriptions in flatMap stream

我有一个 state$ 流,其中包含 messages$s,它是 messages$ 流的数组。 State$ 更新,新的 messages$ 出现。

我希望订阅者在一个单一的流中处理来自所有消息 $ 的消息,我希望这个流只包含正确的事件。

我每次都尝试 flatMap 合并 messages$,但遇到了旧 messages$s(在以前的 states$ 值中)被多次订阅的问题。

我该如何解决这个问题?

let allMessages$ = state$.flatMap(s => { return Observable.merge(s.messages$s) } ) allMessages$.subscribe((x)=>{ console.log('message', x) // message from single message$ appear multiple times })

问题是在更新 state$ 之后(推送项目)旧的被多次订阅。 state$ --s(1)---------s(2)---- message$s[0]. --m1----m2-----------m4-- message$s[1] ---------------m3-------- allMessages$ --m1----m2-----m3----m4 m1 m4

s(1) - 当状态有 1 个 message$ 时,s(2) 当添加第二个 message$ 时 因此 allMessages$ 会触发来自 item1 的消息。

我想要的是: state$ --s(1)---------s(2)----- message$s[0] --m1----m2-----------m4-- message$s[1] ---------------m3-------- allMessages$ --m1----m2-----m3----m4

此文件显示了简化的情况: http://jsfiddle.net/8jFJH/797/

根据您的简化情况,这些是订阅序列(您可以查看答案 以了解热与冷可观察量的解释以及对订阅流程的理解):

  • 排放 state1
    • 订阅 typing$
  • 排放 state2
    • 订阅 typing$
    • 订阅 typing2$

因为您使用 flatMap,您同时拥有三个订阅。如果你使用 flatMapLatest 会发生什么:

  • 排放 state1
    • 订阅 typing$
  • 排放 state2
    • 'unsubscription'(甚至是英语)来自 flatMapLatest 中发出的先前流,即 Rx.Observable.merge(state.items)typing$
    • 订阅 typing$
    • 订阅 typing2$

所以请尝试用 flatMapLatest 替换 flatMap 并告诉我是否可以解决问题。

解决这个问题的另一种方法也可以使用状态更改流而不是整个状态(redux 为 React 所做的事情)。