RxJs observable 监听 10 秒,return 只接收到 5 个值,丢弃其余的,继续监听?

RxJs observable to listen for 10 seconds, return only 5 of the received values, discard the rest, and continue listening?

我有一个 observable,它将从 SignalR 集线器获取多个实时交易值(可能每秒很多)。我想要实现的是一个连续(每 10 秒)输出在最后 10 秒内发生的 5 次交易样本的可观察对象。

我写了一个可观察的管道来尝试通过将所有收到的交易添加到缓冲区中 10 秒来实现这一点,然后使用 'concatMap' 和为缓冲区数组中的每个交易创建一个可观察的'from'。然后,创建另一个收集 5 个值并发出它们的缓冲区。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

但是,管道不断发出它在 10 秒 window 内接收到的所有值,但以 5 个为一组。我尝试在管道中添加一个 take(5) 在 concat 映射之后,并且它对第一批 5 个值正常工作,但随后 observable“完成”并停止侦听新值。我还尝试在 concatMap 之后添加一个带索引的过滤器,如下所示:

filter((v, i) => (i < 6 )),

这适用于第一批 5 个值,但随后会不断过滤掉每个值,因此永远不会创建第二个 5 个缓冲区。此外,过滤器的这个用例似乎已被弃用。

我不确定我是否忽略了这里明显的东西,但我查看了许多 rxjs 运算符,但找不到实现此目的的方法

这样的事情怎么样,

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);

bufferTime 有一个 maxBufferSize 参数可以为您完成此操作。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

您也可以使用 windowTime 来在每个值创建后立即输出,而不是等待所有 5 个值。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

这些内容分别包含在 bufferTime and windowTime 的文档中。

听起来您只需要 bufferTime。您可以决定保留什么和扔掉什么。

this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);