Rx BehaviorSubject + 扫描将先前的事件推送给新订阅者?
Rx BehaviorSubject + scan pushing prior event to new subscriber?
我想要一个可以将减速器功能推送到的流。每次推送 reducer 函数时,应将状态对象传递给 reducer,reducer 应 return 修改后的状态值,并将更新后的状态推送给订阅者。我希望我的代码可以解释:
import Rx from 'rx';
import { Map } from 'immutable';
let initialState = Map({ counter: 0 });
export let upstream = new Rx.BehaviorSubject(Rx.helpers.identity);
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
let increment = state => {
return state.update('counter', counter => counter + 1);
};
upstream.onNext(increment);
downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});
upstream.onNext(increment);
setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);
这是我看到的输出:
subscriptionA 1
subscriptionA 2
subscriptionB 1
虽然我希望看到:
subscriptionA 1
subscriptionA 2
subscriptionB 2
显然我在这里遗漏了一些基本的东西。似乎 BehaviorSubject
应该为新订阅者保留最新值,这让我觉得当 subscriptionB
订阅 downstream
时它会获得最新的减少值,但它看起来像把 .scan
放在中间会弄脏东西……或者别的什么。
这是怎么回事,我该如何完成我想要完成的事情?谢谢!
我有一个解决方案似乎可以提供我正在寻找的结果。如果其他人可以验证它是一个合适的解决方案,我将不胜感激。
import Rx from 'rx';
import { Map } from 'immutable';
let initialState = Map({ counter: 0 });
export let upstream = new Rx.Subject();
let downstreamSource = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
export let downstream = new Rx.BehaviorSubject(initialState);
downstreamSource.subscribe(downstream);
let increment = state => {
return state.update('counter', counter => counter + 1);
};
upstream.onNext(increment);
downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});
upstream.onNext(increment);
setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);
你换了试试看是不是一切都符合你的预期
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
来自
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState).shareReplay(1);
jsfiddle 在这里:http://jsfiddle.net/cqaumutp/
如果是这样,那么您是 Rx.Observable
的冷热性质的另一个受害者,或者更准确地说,是可观察对象的惰性实例化。
简而言之(不是那么短),每次您执行 subscribe
时会发生的事情是,通过向上游操作符链创建可观察链。每个运算符都订阅其源,并且 returns 另一个可观察到起始源。在您的情况下,当您订阅 scan
时,scan
订阅最后一个 upstream
。 upstream
作为一个主题,在订阅时它只是注册订阅者。其他来源会做其他事情(比如在 DOM 节点、套接字或其他任何地方注册一个监听器)。
这里的重点是,每次您订阅 scan
,您都会重新开始,即从 initialState
开始。如果要使用 scan
的第一个订阅的值,则必须使用 share
运算符。在首次订阅 share
时,它会将您的订阅请求传递给 scan
。在第二个和后续的上,它不会,它会注册它,并将来自 scan
首先订阅的所有值转发给关联的观察者。
我想要一个可以将减速器功能推送到的流。每次推送 reducer 函数时,应将状态对象传递给 reducer,reducer 应 return 修改后的状态值,并将更新后的状态推送给订阅者。我希望我的代码可以解释:
import Rx from 'rx';
import { Map } from 'immutable';
let initialState = Map({ counter: 0 });
export let upstream = new Rx.BehaviorSubject(Rx.helpers.identity);
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
let increment = state => {
return state.update('counter', counter => counter + 1);
};
upstream.onNext(increment);
downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});
upstream.onNext(increment);
setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);
这是我看到的输出:
subscriptionA 1
subscriptionA 2
subscriptionB 1
虽然我希望看到:
subscriptionA 1
subscriptionA 2
subscriptionB 2
显然我在这里遗漏了一些基本的东西。似乎 BehaviorSubject
应该为新订阅者保留最新值,这让我觉得当 subscriptionB
订阅 downstream
时它会获得最新的减少值,但它看起来像把 .scan
放在中间会弄脏东西……或者别的什么。
这是怎么回事,我该如何完成我想要完成的事情?谢谢!
我有一个解决方案似乎可以提供我正在寻找的结果。如果其他人可以验证它是一个合适的解决方案,我将不胜感激。
import Rx from 'rx';
import { Map } from 'immutable';
let initialState = Map({ counter: 0 });
export let upstream = new Rx.Subject();
let downstreamSource = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
export let downstream = new Rx.BehaviorSubject(initialState);
downstreamSource.subscribe(downstream);
let increment = state => {
return state.update('counter', counter => counter + 1);
};
upstream.onNext(increment);
downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});
upstream.onNext(increment);
setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);
你换了试试看是不是一切都符合你的预期
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
来自
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState).shareReplay(1);
jsfiddle 在这里:http://jsfiddle.net/cqaumutp/
如果是这样,那么您是 Rx.Observable
的冷热性质的另一个受害者,或者更准确地说,是可观察对象的惰性实例化。
简而言之(不是那么短),每次您执行 subscribe
时会发生的事情是,通过向上游操作符链创建可观察链。每个运算符都订阅其源,并且 returns 另一个可观察到起始源。在您的情况下,当您订阅 scan
时,scan
订阅最后一个 upstream
。 upstream
作为一个主题,在订阅时它只是注册订阅者。其他来源会做其他事情(比如在 DOM 节点、套接字或其他任何地方注册一个监听器)。
这里的重点是,每次您订阅 scan
,您都会重新开始,即从 initialState
开始。如果要使用 scan
的第一个订阅的值,则必须使用 share
运算符。在首次订阅 share
时,它会将您的订阅请求传递给 scan
。在第二个和后续的上,它不会,它会注册它,并将来自 scan
首先订阅的所有值转发给关联的观察者。