RxJS - 分享 'expand' 产生的无限流

RxJS - Share infinite stream produced by 'expand'

我有一个位于网络服务中的分页第三方资源。我想做的是将分页资源转换为值流,并让客户决定使用多少元素。也就是说,客户端应该不知道原始资源是分页的。

到目前为止我得到了以下代码:

import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';

interface Result {
  page: number;
  items: number[];
}

// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
  let items = [...Array(3).keys()].map((e) => e + 3 * page);
  console.log('Fetching page ' + page);
  return of({ page, items });
}

// Turn a paginated request into an infinite stream of 'number'
function mkInfiniteStream(): Observable<number> {
  return fetchPage().pipe(
    expand((res) => fetchPage(res.page + 1)),
    mergeMap((res) => from(res.items))
  );
}

const infinite$ = mkInfiniteStream();

这非常有效:我得到了一个懒惰的无限数字流,客户端可以只执行 infinite$.pipe(take(n)) 并获取第一个 n 元素,而不知道底层资源已分页。

现在,我想做的是在处理多个订阅者时共享这些值,即:

infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));

// Assume that later we have new subscribers
setTimeout(() => {
  infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);

setTimeout(() => {
  infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);

如您所见,我们将有多个订阅者订阅无限流,我想重用已经发出的值以减少fetchPage 电话。在此示例中,一旦客户端请求 10 个项目 (take(10)),那么请求少于 10 个项目(例如 5 个项目)的任何客户端都不会调用 fetchPage,因为 那些物品已经发出.


我尝试了以下方法,但无法获得所需的行为:

const infinite$ = mkInfiniteStream().pipe(share())

不起作用,因为迟到的订阅者会导致多次调用 'fetchPage'。

const infinite$ = mkInfiniteStream().pipe(shareReplay())

强制流中的所有值,即使不需要它们(还没有客户要求所有项目)


如有任何提示,我们将不胜感激。如果有人想尝试代码:https://stackblitz.com/edit/n4ywfw

因为你想要 http 调用的 ahing,你必须在你的 fetchPage 方法中移动 shareReplay 以使其工作

return fetchPage().pipe(
 expand((res) => fetchPage(res.page + 1)),
 mergeMap((res) => from(res.items)),
 shareReplay()
);

https://stackblitz.com/edit/n4ywfw-xkajmp?file=index.ts

这是我为您更新的另一个示例,您可以注意到调用只进行了一次

https://stackblitz.com/edit/angular-ivy-o9pjw6?file=src/app/app.component.ts

看来我们需要找到一种方法来存储一些状态,例如最后读取的页码和获取的项目,并确保在新订阅到达时重播此状态。

一种可能是以这种方式处理闭包。

首先,您创建一个 infiniteStreamFactoryGenerator 函数,其中 returns 一个函数通过闭包保存一些状态,特别是 lastPageitemsRead。这种状态在 infiniteStreamFactoryGenerator 函数返回的函数中更新。

function infiniteStreamFactoryGenerator() {
  let lastPage = 0;
  let itemsRead = [];
  return () => {
    // if  there are items read we return them first and then we start
    // the infinite stream
    return concat(
      from(itemsRead),
      fetchPage(lastPage).pipe(
        expand((res) => {
          return res.page < 10 ? fetchPage(res.page + 1) : EMPTY;
        }),
        tap((res) => {
          // here we update the state
          lastPage++;
          itemsRead = [...itemsRead, ...res.items];
        }),
        mergeMap((res) => from(res.items))
      )
    );
  };
}

然后我们调用 infiniteStreamFactoryGenerator 来创建真正的工厂函数,并使用这样的工厂函数来实例化各种流,就像这样

const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();

infiniteStreamFactory$()
  .pipe(take(10))
  .subscribe((v) => console.log("[1] got ", v));

// Assume that later we have new subscribers
setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(5))
    .subscribe((v) => console.log("[2] got ", v));
}, 1000);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(4))
    .subscribe((v) => console.log("[3] got ", v));
}, 1500);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(20))
    .subscribe((v) => console.log("[4] got ", v));
}, 2000);

如您所见,我还添加了第四个订阅者,需要加载其他页面。

根据this stackblitz,这个解决方案似乎可行。

老实说,必须构建一个 工厂生成器函数 的想法表明我可能有更简单的方法来解决这个有趣的问题,但我还没有找到它.

我最终得到的解决方案涉及使用闭包创建有状态的 Observable。我们制作了自己的 share 版本,并将其固定到 Observable 中。

唯一的变化是 mkInfiniteStream 从原来的代码:

function mkInfiniteStream(): Observable<number> {
  let lastPage = 0;
  let itemsRead = [];
  return defer(() =>
    from(itemsRead).pipe(
      concatWith(
        fetchPage(lastPage).pipe(
          expand((res) => fetchPage(res.page + 1)),
          tap((res) => {
            lastPage++;
            itemsRead = [...itemsRead, ...res.items];
          }),
          mergeMap((res) => from(res.items))
        )
      )
    )
  );
}

// Usage is still the same:
const infinite$ = mkInfiniteStream();

想法是在内部跟踪所有项目和获取的最后一页。当请求尚未获取的项目时,Observable 会使用下一页的项目进行扩展。这反过来会更新所有可用的项目 (itemsRead = itemsRead = [...itemsRead, ...res.items]),因此新订阅者将在强制扩展 Observable 之前使用已经获取的存储在 itemsRead 中的项目。

使用 share 对此 Observable 没有任何作用,因为数据已经被实现共享。使用 shareReplay 会强制无限流,因此不建议这样做!

我希望有一个更“纯粹”的实现,所以如果你想出一个,请分享它!


感谢@Picci 的初步想法。