RxJs——在每次突发事件后重播所有事件

RxJs -- replay all events after each spurt of events

你是怎么做到的? RxJs 对我来说还是个谜。

我正在尝试类似的东西:

filterChanges
   .delay(400)
   .replay()
   .reduce(function(acc,x) { return acc.concat(x) }, [])
   .subscribe(function(changes) {
      console.log(changes);
      ...

 filterChanges.subscribe(function() {
     filterChanges.aggregate(function(changes) {
          ...

我真的迷路了。我想要的行为是:某些用户操作会导致多个过滤器更改。我不想一次一个地处理它们,而只是在变化的激增完成时才处理它们。但是当我处理它们时,我希望所有过滤器都从流的开头更改。

既然我写了这篇文章,我意识到在突然发生变化后只获取所有过滤器会更好,而不是所有变化本身,所以我只需要赶上突然发生的结束。我想要这两个问题的答案,因为我认为这将帮助我更好地理解 RxJs。

  1. 订阅突发事件结束。
  2. 订阅突发事件的结尾并捕获来自 流的开头。

如果我理解正确,请将 bufferdebounce 结合使用。 Buffer 将为您提供缓冲区 window 期间发生的事件列表,而 debounce 将定义缓冲区 window 何时应关闭,这就是您如何在突发事件中获取所有事件。要从流的最开始获取所有事件,您 scan 从缓冲区累积所有列表。

var hotFilterChanges = filterChanges.share();

hotFilterChanges
  .buffer(hotFilterChanges.debounce(200))
  .scan((acc, curr) => acc.concat(curr))
  .subscribe(x => console.log(x));

我们使用 .share() 使 Observable 成为 "hot"。这是必要的,所以 hotFilterChange.buffer( ... )hotFilterChanges.debounce(200) 指的是完全相同的 Observable 执行。

See the JSBin.