Rx BehaviorSubject + 扫描将先前的事件推送给新订阅者?

Rx BehaviorSubject + scan pushing prior event to new subscriber?

我想要一个可以将减速器功能推送到的流。每次推送 reducer 函数时,应将状态对象传递给 reducer,reducer 应 return 修改后的状态值,并将更新后的状态推送给订阅者。我希望我的代码可以解释:

import Rx from 'rx';
import { Map } from 'immutable';

let initialState = Map({ counter: 0 });

export let upstream = new Rx.BehaviorSubject(Rx.helpers.identity);
export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

let increment = state => {
  return state.update('counter', counter => counter + 1);
};

upstream.onNext(increment);

downstream.subscribe(state => {
  console.log('subscriptionA', state.get('counter'));
});

upstream.onNext(increment);

setTimeout(() => {
  downstream.subscribe(state => {
    console.log('subscriptionB', state.get('counter'));
  });
}, 3000);

这是我看到的输出:

subscriptionA 1
subscriptionA 2
subscriptionB 1

虽然我希望看到:

subscriptionA 1
subscriptionA 2
subscriptionB 2

显然我在这里遗漏了一些基本的东西。似乎 BehaviorSubject 应该为新订阅者保留最新值,这让我觉得当 subscriptionB 订阅 downstream 时它会获得最新的减少值,但它看起来像把 .scan 放在中间会弄脏东西……或者别的什么。

这是怎么回事,我该如何完成我想要完成的事情?谢谢!

我有一个解决方案似乎可以提供我正在寻找的结果。如果其他人可以验证它是一个合适的解决方案,我将不胜感激。

import Rx from 'rx';
import { Map } from 'immutable';

let initialState = Map({ counter: 0 });

export let upstream = new Rx.Subject();

let downstreamSource = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

export let downstream = new Rx.BehaviorSubject(initialState);
downstreamSource.subscribe(downstream);

let increment = state => {
  return state.update('counter', counter => counter + 1);
};

upstream.onNext(increment);

downstream.subscribe(state => {
  console.log('subscriptionA', state.get('counter'));
});

upstream.onNext(increment);

setTimeout(() => {
  downstream.subscribe(state => {
    console.log('subscriptionB', state.get('counter'));
  });
}, 3000);

你换了试试看是不是一切都符合你的预期

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

来自

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState).shareReplay(1);

jsfiddle 在这里:http://jsfiddle.net/cqaumutp/

如果是这样,那么您是 Rx.Observable 的冷热性质的另一个受害者,或者更准确地说,是可观察对象的惰性实例化。

简而言之(不是那么短),每次您执行 subscribe 时会发生的事情是,通过向上游操作符链创建可观察链。每个运算符都订阅其源,并且 returns 另一个可观察到起始源。在您的情况下,当您订阅 scan 时,scan 订阅最后一个 upstreamupstream 作为一个主题,在订阅时它只是注册订阅者。其他来源会做其他事情(比如在 DOM 节点、套接字或其他任何地方注册一个监听器)。

这里的重点是,每次您订阅 scan,您都会重新开始,即从 initialState 开始。如果要使用 scan 的第一个订阅的值,则必须使用 share 运算符。在首次订阅 share 时,它会将您的订阅请求传递给 scan。在第二个和后续的上,它不会,它会注册它,并将来自 scan 首先订阅的所有值转发给关联的观察者。