如何在 Rx.js 中的油门中的每个值后重置 window 持续时间?
How to reset window duration after each value in throttle in Rx.js?
我希望在生成新值时收到通知,但前提是自上一个值生成以来已经过去了一段时间。
例如如果源开始产生值(在 t=0s)并每秒产生新值,持续 10 秒,然后停止 5 秒,然后再次开始(在 t=15s),假设 window 持续时间设置为更少超过 5 秒,我只想收到有关 t=0 和 t=15 处的值的通知(在它们生成后立即)。类似于油门,但每个新值都会重置 window 持续时间。
这是匆忙完成的,但你似乎在工作:jsfiddle
思路是从第一个值开始,以指定的持续时间去抖动,并在去抖动后取下一个值。 source
必须分享,因为它将被多次订阅。
var ta_message = document.getElementById('ta_message');
var ta_intermediary = document.getElementById('ta_intermediary');
var ta_result = document.getElementById('ta_result');
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var count = 0;
var Xms = 1700;
var source$ = Rx.Observable
.fromEvent(document.getElementById('source'), 'click')
.map(function(){return ++count;})
.do(emits(ta_message, 'count'))
.share();
var firstNextItem$ = source$
.debounce(Xms)
.flatMapLatest(function(){return source$.take(1);})
.do(emits(ta_intermediary, 'first value after Xms pause'))
;
var result$ = Rx.Observable.merge(source$.take(1), firstNextItem$)
.do(emits(ta_result, 'result'))
;
result$.subscribe(function(){});
我希望在生成新值时收到通知,但前提是自上一个值生成以来已经过去了一段时间。
例如如果源开始产生值(在 t=0s)并每秒产生新值,持续 10 秒,然后停止 5 秒,然后再次开始(在 t=15s),假设 window 持续时间设置为更少超过 5 秒,我只想收到有关 t=0 和 t=15 处的值的通知(在它们生成后立即)。类似于油门,但每个新值都会重置 window 持续时间。
这是匆忙完成的,但你似乎在工作:jsfiddle
思路是从第一个值开始,以指定的持续时间去抖动,并在去抖动后取下一个值。 source
必须分享,因为它将被多次订阅。
var ta_message = document.getElementById('ta_message');
var ta_intermediary = document.getElementById('ta_intermediary');
var ta_result = document.getElementById('ta_result');
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var count = 0;
var Xms = 1700;
var source$ = Rx.Observable
.fromEvent(document.getElementById('source'), 'click')
.map(function(){return ++count;})
.do(emits(ta_message, 'count'))
.share();
var firstNextItem$ = source$
.debounce(Xms)
.flatMapLatest(function(){return source$.take(1);})
.do(emits(ta_intermediary, 'first value after Xms pause'))
;
var result$ = Rx.Observable.merge(source$.take(1), firstNextItem$)
.do(emits(ta_result, 'result'))
;
result$.subscribe(function(){});