源发射速率快时的 Rxjs 缓冲区

Rxjs buffer when source emitions rate is fast

比方说,我有一个每秒 x0 次发射(可能是 50、60 次……)的可观察到的发射,有时它只是每秒发射 1 次或 2 次。

现在我怎样才能缓冲那些快速发射并仍然处理慢速发射。

我累的是:

BufferTime 需要一个时间跨度,因此即使一次发射也会被缓冲,(加上 BufferTime 会使量角器测试超时)。

BufferCount(x) 在收到所有 x 次发射后才会发射。

听起来你想要类似于 debounce + buffer 的东西。最简单的实现是使用流的反跳来触发发出同一流的缓冲区。您可能希望共享流以防止重复订阅。这是一个 运行 示例:

const source = new Rx.Observable.create((o) => {
  let count = 0;
  const emit = () => {
    const timeout = Math.random() * 1000;
    setTimeout(() => {
      o.next(count++);
      if (count < 20) {
       emit();
      } else {
        o.complete();
      }
    }, timeout);
  };
  emit();
}).share();

const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>

请注意,去抖没有上限,因为如果它在去抖时间内继续接收值,它就不会发出。实际上,这不会对您的场景产生影响,但在其他场景中,理论上可以。

如 bygrace 所述,您要找的是 debounce + buffer

在现代 RXJS 6 和 ES6 Typescript 中添加类型推断,我创建了一个自定义 OperatorFunction 来非常简单地做到这一点,称为 bufferDebounce.

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;

const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer => 
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
  })
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log);

你可以在这里看到一个工作示例https://stackblitz.com/edit/rxjs6-buffer-debounce

希望这对您和任何新人有所帮助。