RxJs——在每次突发事件后重播所有事件
RxJs -- replay all events after each spurt of events
你是怎么做到的? RxJs 对我来说还是个谜。
我正在尝试类似的东西:
filterChanges
.delay(400)
.replay()
.reduce(function(acc,x) { return acc.concat(x) }, [])
.subscribe(function(changes) {
console.log(changes);
...
或
filterChanges.subscribe(function() {
filterChanges.aggregate(function(changes) {
...
我真的迷路了。我想要的行为是:某些用户操作会导致多个过滤器更改。我不想一次一个地处理它们,而只是在变化的激增完成时才处理它们。但是当我处理它们时,我希望所有过滤器都从流的开头更改。
既然我写了这篇文章,我意识到在突然发生变化后只获取所有过滤器会更好,而不是所有变化本身,所以我只需要赶上突然发生的结束。我想要这两个问题的答案,因为我认为这将帮助我更好地理解 RxJs。
- 订阅突发事件结束。
- 订阅突发事件的结尾并捕获来自
流的开头。
如果我理解正确,请将 buffer
与 debounce
结合使用。 Buffer 将为您提供缓冲区 window 期间发生的事件列表,而 debounce
将定义缓冲区 window 何时应关闭,这就是您如何在突发事件中获取所有事件。要从流的最开始获取所有事件,您 scan
从缓冲区累积所有列表。
var hotFilterChanges = filterChanges.share();
hotFilterChanges
.buffer(hotFilterChanges.debounce(200))
.scan((acc, curr) => acc.concat(curr))
.subscribe(x => console.log(x));
我们使用 .share()
使 Observable 成为 "hot"。这是必要的,所以 hotFilterChange.buffer( ... )
和 hotFilterChanges.debounce(200)
指的是完全相同的 Observable 执行。
你是怎么做到的? RxJs 对我来说还是个谜。
我正在尝试类似的东西:
filterChanges
.delay(400)
.replay()
.reduce(function(acc,x) { return acc.concat(x) }, [])
.subscribe(function(changes) {
console.log(changes);
...
或
filterChanges.subscribe(function() {
filterChanges.aggregate(function(changes) {
...
我真的迷路了。我想要的行为是:某些用户操作会导致多个过滤器更改。我不想一次一个地处理它们,而只是在变化的激增完成时才处理它们。但是当我处理它们时,我希望所有过滤器都从流的开头更改。
既然我写了这篇文章,我意识到在突然发生变化后只获取所有过滤器会更好,而不是所有变化本身,所以我只需要赶上突然发生的结束。我想要这两个问题的答案,因为我认为这将帮助我更好地理解 RxJs。
- 订阅突发事件结束。
- 订阅突发事件的结尾并捕获来自 流的开头。
如果我理解正确,请将 buffer
与 debounce
结合使用。 Buffer 将为您提供缓冲区 window 期间发生的事件列表,而 debounce
将定义缓冲区 window 何时应关闭,这就是您如何在突发事件中获取所有事件。要从流的最开始获取所有事件,您 scan
从缓冲区累积所有列表。
var hotFilterChanges = filterChanges.share();
hotFilterChanges
.buffer(hotFilterChanges.debounce(200))
.scan((acc, curr) => acc.concat(curr))
.subscribe(x => console.log(x));
我们使用 .share()
使 Observable 成为 "hot"。这是必要的,所以 hotFilterChange.buffer( ... )
和 hotFilterChanges.debounce(200)
指的是完全相同的 Observable 执行。