RXJS:聚合去抖
RXJS: Aggregated debounce
我的用例如下:我收到事件,这些事件有时会突然发生。如果发生突发,我只需要处理一次。去抖就是这样做的。
然而,debounce 只给出了 burst 的最后一个元素,但我需要知道 burst 中的所有元素以聚合它们(使用 flatmap)。
这可以通过定时 window 或缓冲区来完成,但是,这些是固定间隔,因此 buffer/window 超时可能发生在突发的中间,因此将突发分成 2要处理的零件而不是 1 个。
所以我想要的是
.
.
event: a
.
. -> a
.
.
.
.
.
.event: b
.event: c
.event: d
.
.-> b,c,d
.
.
.
.
.event : e
.
. -> e
.
您可能正在寻找 bufferWithTimeOrCount
来自页面:
/* Hitting the count buffer first */
var source = Rx.Observable.interval(100)
.bufferWithTimeOrCount(500, 3)
.take(3);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7,8
// => Completed
这可以通过 buffer 通过将去抖流作为关闭选择器传入来实现,例如:
var s = Rx.Observable.of('a')
.merge(Rx.Observable.of('b').delay(100))
.merge(Rx.Observable.of('c').delay(150))
.merge(Rx.Observable.of('d').delay(200))
.merge(Rx.Observable.of('e').delay(300))
.share()
;
s.buffer(s.debounce(75)).subscribe(x => console.log(x));
我的用例如下:我收到事件,这些事件有时会突然发生。如果发生突发,我只需要处理一次。去抖就是这样做的。
然而,debounce 只给出了 burst 的最后一个元素,但我需要知道 burst 中的所有元素以聚合它们(使用 flatmap)。
这可以通过定时 window 或缓冲区来完成,但是,这些是固定间隔,因此 buffer/window 超时可能发生在突发的中间,因此将突发分成 2要处理的零件而不是 1 个。
所以我想要的是
.
.
event: a
.
. -> a
.
.
.
.
.
.event: b
.event: c
.event: d
.
.-> b,c,d
.
.
.
.
.event : e
.
. -> e
.
您可能正在寻找 bufferWithTimeOrCount
来自页面:
/* Hitting the count buffer first */
var source = Rx.Observable.interval(100)
.bufferWithTimeOrCount(500, 3)
.take(3);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7,8
// => Completed
这可以通过 buffer 通过将去抖流作为关闭选择器传入来实现,例如:
var s = Rx.Observable.of('a')
.merge(Rx.Observable.of('b').delay(100))
.merge(Rx.Observable.of('c').delay(150))
.merge(Rx.Observable.of('d').delay(200))
.merge(Rx.Observable.of('e').delay(300))
.share()
;
s.buffer(s.debounce(75)).subscribe(x => console.log(x));