RxJS - 对 maxWait 和 maxElements 使用 windowWhen() windows

RxJS - using windowWhen() for maxWait and maxElements windows

我正在尝试将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount() 提供,但已从 RxJS 5 中删除。

一切正常,只是 window 的最后一个元素丢失了。说到 'lost' - 这就是我现在的感受。有哪位 RxJS 大师可以告诉我我做错了什么吗?

 private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
    // complete after maxEntries / maxWait (whatever comes first).
    const toggleSubject = new Subject<void>();

    return queue

    // Start emitting a new Observable every time toggleSubject emits.
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
    // complete collection)
      .windowWhen(() => toggleSubject)

      // map() is called once for every window (maxEntries/maxWait)
      // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
      // count all elements, then emitting on toggleSubject, triggering a new Observable.
      // (We have to map() here - instead of an outer do() -  because otherwise the original obs would be streamed
      // and the hooked up version with the inner do() would never be called.)
      .map((obs) => {
        // counts the number of cacheEntries already in this stream
        let count = 0;
        // flag to kill the timeout callback
        let done = false;
        // we have to return an Observable
        return obs.do(() => {
            count++;
            if (count === 1) {
              // we start counting when the first element is streamed.
              IntervalObservable.create(maxWait).first().subscribe(() => {
                if (!done) {
                  //trigger due to maxWait
                  toggleSubject.next(null);
                }
              });
            }
            if (count > (maxEntries)) {
              done = true;
              // trigger due due to maxEntries(' + maxEntries + ')');
              toggleSubject.next(null);
            }
          }
        );
      });
  }

由于 if (count > (maxEntries)) 而触发 toggleSubject.next(null) 的元素丢失了(不在任何 window 中)。

编辑:maxTime 在推送新 Observable 的第一个元素时开始计时。 if (count === 1 )。这是 a) 我在 map() 中的 windowed Observables 内部工作的原因和 b) 重要的,因为这是必需的行为。

示例:maxElements:100,maxWait:100。101 个元素在 t=99 时被推送。预期行为:在 t=99 时,一个包含 100 个元素的 Observable 被推送。剩下 1 个元素。计数器 + 定时器复位。在 t=199,第二个 'chunk' 的计数器到期并推送一个具有 1 个元素的 Observable。

(在这个例子中,Brandons(见答案)代码会——如果我没看错的话——在 t=99 时推送一个包含 100 个元素的 Observable,然后 1 毫秒后,在 t =100,一个只有一个元素的 Observable。)

是的,您不想使用 map 来产生这样的副作用。正如您所注意到的,您最终会掉落物品。

这是一个通用方法,我认为它可以满足您的需求。

注意:RXJS 5 目前有一个 issue 具有此发布重载的类型定义。我添加了一些应该允许它在 TypeScript 中编译的类型转换。

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait
        const timer = IntervalObservable.create(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}

编辑:

如果您不想在每个 window 中的第一个项目到达之后才开始计时,那么您可以这样做:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait after the first
        // item in this window arrives:
        const timer = entries.take(1).delay(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}

我想出的解决方案是在异步调度程序上切换windowWhen()

if (count === (maxEntries)) {
  done = true;
  this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')');
  Rx.Scheduler.async.schedule(()=>toggleSubject.next(null));
}

问题是 windowWhen() 立即完成了返回的 Observables - 阻止了任何下游操作员接收最后一个值。

很抱歉提出(和回答)这个问题。在发帖之前我尝试了 Rx.Scheduler.async 等,但不知何故它似乎不起作用。