RxJS 在收到通知时加载并缓存前 N 个值,然后在下一次通知时一个一个地发出值

RxJS load and cache first N values when notified and then emit values one by one on next notifications

我正在构建一个 angular 应用程序,我正在尝试使用 RxJS 以反应方式尽可能多地实施。

我正在实现类似于旋转木马的功能 - 用户操作时会显示下一个项目。一次从 BE 收到一件物品。我正在努力让用户体验流畅。所以,我想在向用户显示任何内容之前预加载前 N 个项目。

加载 N 个项目后 - 我需要显示第一个加载的项目。当用户单击下一步时 - 我发出下一个预加载项目并触发下一个项目的预加载以确保预加载项目的数量始终为 N。

有点缓冲 - 不要让用户在每一步都等待。因此,在用户仍在查看之前的项目时预加载下一个项目。

我想在加载项目时使用 bufferCount(N) 之类的东西,然后使用 map((ar) => from(ar)) 展开数组。并与另一个通知程序 Subject 使用 zip 触发该缓冲区的排放。但它似乎工作得不是很好。看起来每 N 次排放我都有一些小故障,我首先看到一个项目,然后很快看到另一个项目。

不确定如何更好地实施它。这似乎应该是一个常见的用例。

----编辑----

项目是通过 http 加载的,对吧。 这是我在 atm 上的一些代码(不是真正的功能 - 只是概念):

//..........
// loadRndItem loads data using httpClient
this.loadingBuffer$ =  this.nextItemSubj.pipe(
      // here should somehow trigger first N loading processes
      flatMap(() => this.loadRndItem()),
      bufferCount(this.bufferSize),
      flatMap((ar) => {
        return from(ar);
      }),
      share(),
      takeUntil(this.endSubj)
    );

this.currentItem$ = zip(
    this.loadingBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    share(),
    takeUntil(this.gameEndSubj)
  );

  //..........
function nextItem(): void {
  this.nextItemSubj.next();
}

我想我已经实现了我想要的行为。

不过可能有点老套。这里还有一个订阅。 这是代码:

nextItemSubj: Subject<void> = new Subject();

// loading CACHE_SIZE items on init. And load more on nextItemSubj emission
// spread the loaded array into separate emissions
this.itemsBuffer$ = concat(
    from(new Array(CACHE_SIZE).fill(null)),
    this.nextItemSubj
  ).pipe(
    flatMap(() => this.loadRndItem()),
    bufferCount(CACHE_SIZE),
    mergeMap((ar) => {
      return from(ar);
    }),
    share(),
    takeUntil(this.endSubj)
  );

// emit preloaded values one by one when nextItemSubj.next() is called
this.currentQuizData$ = zip(
    this.itemsBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    shareReplay(),
    takeUntil(this.endSubj)
  );

// when itemBuffer$ first emits (after CACHE_SIZE items are preloaded)
// this will trigger first emission of currentItem$ to display
// and also trigger next item loading
this.itemsBuffer$.pipe(take(1)).subscribe(() => this.nextItemSubj.next());

我想你可以这样做:

const click$ = new Subject();
const loadMore$ = new Subject();

const buffer$ = concat(of(N), loadMore$)
  .pipe(
    // Unpack the array of results with `concatAll()` into separate emissions
    concatMap(n => forkJoin(makeNRequests(n)).pipe(concatAll())),
  );

zip(buffer$, click$.pipe(startWith(1)))
  .pipe(
     map((result, index) => {
       if (index > 0) { // We need `index` so that's why this is not inside `tap`.
         loadMore$.next(1);
       }
       return result[0];
     }),
   )
  .subscribe(console.log);

现场演示:https://stackblitz.com/edit/rxjs-kdh9uk?file=index.ts