从不可预测的源 Observable 构建 "Heartbeat" Observable

Build "Heartbeat" Observable from Unpredictable Source Observable

我有一个 Observable,source,它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个每 500 毫秒可靠地发出其值的 Observable。

假设 source 在这些时间发出值:

我想“平滑”这个流,以便我得到如下输出:

一种天真的方法可能只是在源项的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。

我已经尝试了 .timer().interval().flatMap() 的各种组合,但没有任何希望。

您可以结合使用 combineLatestintervalthrottle - 您可以添加第二个可观察对象,interval 以及您想要的调用间隔时间(例如 500 毫秒), 所以你的 observable 将每 500 毫秒发射一次(当与 combineLatest 一起使用时),现在它将每 500 毫秒发射一次值 每次原始 source 发射时,所以你可以在管道中添加 throttle ,这将导致间隔节流:

combineLatest([source, timer(5000)])
  .pipe(
    throttle(() => interval(5000)),
    tap(([value]) => {
      console.log("emitted", value, new Date().getSeconds());
    })
  )
  .subscribe();

(tap这里不用加,只是为了演示)

我想你可以试试这个:

const src$ = merge(
  timer(100).pipe(mapTo(1)),
  timer(980).pipe(mapTo(2)),
  timer(1020).pipe(mapTo(3)),
  timer(1300).pipe(mapTo(4))
);

src$
  .pipe(
    bufferTime(500),
    mergeAll()
  )
  .subscribe(console.log);

bufferTime 用于创建一个计时器,该计时器将以恒定间隔发出,而不管发出的值如何。然后 mergeAll 用于 分解 bufferTime.

产生的数组

StackBlitz demo.

对于比您的间隔发射得更快的源

zip 所需时间跨度 interval 的来源。

zip(source, interval(500)).pipe(
  map(([value, _]) => value)  // only emit the source value
)

zip 发出来自 source 的第一项和来自 interval 的第一项,然后发出来自 source 的第二项和来自 interval 的第二项] 等等。如果输出 observable 只应在 interval 发出时发出,则 source 的第 N 个值必须在 来自 interval 的第 N 个值之前到达

潜在问题: 如果您的 source 在某些时候发出的速度比 interval 慢(即来自 source 的第 N 个值到达 来自 interval 的第 N 个值之后) 那么 zip 会直接发射而不用等待下一次 interval 发射。

// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
zip output:   -----1-----2-----3-----4--------5----6-----
                   ✓     ✓     ✓     ✓        ⚠️    ⚠️
// emits 5 and 6 don't happen when interval emits

对于以任何速率发射的源

function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      let sourceCompleted = false;
      const queue = source.pipe(
        tap({ complete: () => (sourceCompleted = true) }),
        scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
      );
      return interval(period).pipe(
        withLatestFrom(queue), // combine with the latest buffer
        takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
        filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
        map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
      );
    });
}

source.pipe(
  emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
output:       -----1-----2-----3-----4-----------5-----6-
                   ✓     ✓     ✓     ✓           ✓     ✓   
// all output emits happen when interval emits

https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts