RxJS - 分享 'expand' 产生的无限流
RxJS - Share infinite stream produced by 'expand'
我有一个位于网络服务中的分页第三方资源。我想做的是将分页资源转换为值流,并让客户决定使用多少元素。也就是说,客户端应该不知道原始资源是分页的。
到目前为止我得到了以下代码:
import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
interface Result {
page: number;
items: number[];
}
// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
console.log('Fetching page ' + page);
return of({ page, items });
}
// Turn a paginated request into an infinite stream of 'number'
function mkInfiniteStream(): Observable<number> {
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items))
);
}
const infinite$ = mkInfiniteStream();
这非常有效:我得到了一个懒惰的无限数字流,客户端可以只执行 infinite$.pipe(take(n))
并获取第一个 n
元素,而不知道底层资源已分页。
现在,我想做的是在处理多个订阅者时共享这些值,即:
infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));
// Assume that later we have new subscribers
setTimeout(() => {
infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);
setTimeout(() => {
infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);
如您所见,我们将有多个订阅者订阅无限流,我想重用已经发出的值以减少fetchPage
电话。在此示例中,一旦客户端请求 10 个项目 (take(10)
),那么请求少于 10 个项目(例如 5 个项目)的任何客户端都不会调用 fetchPage
,因为 那些物品已经发出.
我尝试了以下方法,但无法获得所需的行为:
const infinite$ = mkInfiniteStream().pipe(share())
不起作用,因为迟到的订阅者会导致多次调用 'fetchPage'。
const infinite$ = mkInfiniteStream().pipe(shareReplay())
强制流中的所有值,即使不需要它们(还没有客户要求所有项目)
如有任何提示,我们将不胜感激。如果有人想尝试代码:https://stackblitz.com/edit/n4ywfw
因为你想要 http 调用的 ahing,你必须在你的 fetchPage 方法中移动 shareReplay 以使其工作
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items)),
shareReplay()
);
https://stackblitz.com/edit/n4ywfw-xkajmp?file=index.ts
这是我为您更新的另一个示例,您可以注意到调用只进行了一次
https://stackblitz.com/edit/angular-ivy-o9pjw6?file=src/app/app.component.ts
看来我们需要找到一种方法来存储一些状态,例如最后读取的页码和获取的项目,并确保在新订阅到达时重播此状态。
一种可能是以这种方式处理闭包。
首先,您创建一个 infiniteStreamFactoryGenerator
函数,其中 returns 一个函数通过闭包保存一些状态,特别是 lastPage
和 itemsRead
。这种状态在 infiniteStreamFactoryGenerator
函数返回的函数中更新。
function infiniteStreamFactoryGenerator() {
let lastPage = 0;
let itemsRead = [];
return () => {
// if there are items read we return them first and then we start
// the infinite stream
return concat(
from(itemsRead),
fetchPage(lastPage).pipe(
expand((res) => {
return res.page < 10 ? fetchPage(res.page + 1) : EMPTY;
}),
tap((res) => {
// here we update the state
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
);
};
}
然后我们调用 infiniteStreamFactoryGenerator
来创建真正的工厂函数,并使用这样的工厂函数来实例化各种流,就像这样
const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();
infiniteStreamFactory$()
.pipe(take(10))
.subscribe((v) => console.log("[1] got ", v));
// Assume that later we have new subscribers
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(5))
.subscribe((v) => console.log("[2] got ", v));
}, 1000);
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(4))
.subscribe((v) => console.log("[3] got ", v));
}, 1500);
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(20))
.subscribe((v) => console.log("[4] got ", v));
}, 2000);
如您所见,我还添加了第四个订阅者,需要加载其他页面。
根据this stackblitz,这个解决方案似乎可行。
老实说,必须构建一个 工厂生成器函数 的想法表明我可能有更简单的方法来解决这个有趣的问题,但我还没有找到它.
我最终得到的解决方案涉及使用闭包创建有状态的 Observable。我们制作了自己的 share
版本,并将其固定到 Observable 中。
唯一的变化是 mkInfiniteStream
从原来的代码:
function mkInfiniteStream(): Observable<number> {
let lastPage = 0;
let itemsRead = [];
return defer(() =>
from(itemsRead).pipe(
concatWith(
fetchPage(lastPage).pipe(
expand((res) => fetchPage(res.page + 1)),
tap((res) => {
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
)
)
);
}
// Usage is still the same:
const infinite$ = mkInfiniteStream();
想法是在内部跟踪所有项目和获取的最后一页。当请求尚未获取的项目时,Observable 会使用下一页的项目进行扩展。这反过来会更新所有可用的项目 (itemsRead = itemsRead = [...itemsRead, ...res.items]
),因此新订阅者将在强制扩展 Observable 之前使用已经获取的存储在 itemsRead
中的项目。
使用 share
对此 Observable 没有任何作用,因为数据已经被实现共享。使用 shareReplay
会强制无限流,因此不建议这样做!
我希望有一个更“纯粹”的实现,所以如果你想出一个,请分享它!
感谢@Picci 的初步想法。
我有一个位于网络服务中的分页第三方资源。我想做的是将分页资源转换为值流,并让客户决定使用多少元素。也就是说,客户端应该不知道原始资源是分页的。
到目前为止我得到了以下代码:
import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
interface Result {
page: number;
items: number[];
}
// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
console.log('Fetching page ' + page);
return of({ page, items });
}
// Turn a paginated request into an infinite stream of 'number'
function mkInfiniteStream(): Observable<number> {
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items))
);
}
const infinite$ = mkInfiniteStream();
这非常有效:我得到了一个懒惰的无限数字流,客户端可以只执行 infinite$.pipe(take(n))
并获取第一个 n
元素,而不知道底层资源已分页。
现在,我想做的是在处理多个订阅者时共享这些值,即:
infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));
// Assume that later we have new subscribers
setTimeout(() => {
infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);
setTimeout(() => {
infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);
如您所见,我们将有多个订阅者订阅无限流,我想重用已经发出的值以减少fetchPage
电话。在此示例中,一旦客户端请求 10 个项目 (take(10)
),那么请求少于 10 个项目(例如 5 个项目)的任何客户端都不会调用 fetchPage
,因为 那些物品已经发出.
我尝试了以下方法,但无法获得所需的行为:
const infinite$ = mkInfiniteStream().pipe(share())
不起作用,因为迟到的订阅者会导致多次调用 'fetchPage'。
const infinite$ = mkInfiniteStream().pipe(shareReplay())
强制流中的所有值,即使不需要它们(还没有客户要求所有项目)
如有任何提示,我们将不胜感激。如果有人想尝试代码:https://stackblitz.com/edit/n4ywfw
因为你想要 http 调用的 ahing,你必须在你的 fetchPage 方法中移动 shareReplay 以使其工作
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items)),
shareReplay()
);
https://stackblitz.com/edit/n4ywfw-xkajmp?file=index.ts
这是我为您更新的另一个示例,您可以注意到调用只进行了一次
https://stackblitz.com/edit/angular-ivy-o9pjw6?file=src/app/app.component.ts
看来我们需要找到一种方法来存储一些状态,例如最后读取的页码和获取的项目,并确保在新订阅到达时重播此状态。
一种可能是以这种方式处理闭包。
首先,您创建一个 infiniteStreamFactoryGenerator
函数,其中 returns 一个函数通过闭包保存一些状态,特别是 lastPage
和 itemsRead
。这种状态在 infiniteStreamFactoryGenerator
函数返回的函数中更新。
function infiniteStreamFactoryGenerator() {
let lastPage = 0;
let itemsRead = [];
return () => {
// if there are items read we return them first and then we start
// the infinite stream
return concat(
from(itemsRead),
fetchPage(lastPage).pipe(
expand((res) => {
return res.page < 10 ? fetchPage(res.page + 1) : EMPTY;
}),
tap((res) => {
// here we update the state
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
);
};
}
然后我们调用 infiniteStreamFactoryGenerator
来创建真正的工厂函数,并使用这样的工厂函数来实例化各种流,就像这样
const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();
infiniteStreamFactory$()
.pipe(take(10))
.subscribe((v) => console.log("[1] got ", v));
// Assume that later we have new subscribers
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(5))
.subscribe((v) => console.log("[2] got ", v));
}, 1000);
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(4))
.subscribe((v) => console.log("[3] got ", v));
}, 1500);
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(20))
.subscribe((v) => console.log("[4] got ", v));
}, 2000);
如您所见,我还添加了第四个订阅者,需要加载其他页面。
根据this stackblitz,这个解决方案似乎可行。
老实说,必须构建一个 工厂生成器函数 的想法表明我可能有更简单的方法来解决这个有趣的问题,但我还没有找到它.
我最终得到的解决方案涉及使用闭包创建有状态的 Observable。我们制作了自己的 share
版本,并将其固定到 Observable 中。
唯一的变化是 mkInfiniteStream
从原来的代码:
function mkInfiniteStream(): Observable<number> {
let lastPage = 0;
let itemsRead = [];
return defer(() =>
from(itemsRead).pipe(
concatWith(
fetchPage(lastPage).pipe(
expand((res) => fetchPage(res.page + 1)),
tap((res) => {
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
)
)
);
}
// Usage is still the same:
const infinite$ = mkInfiniteStream();
想法是在内部跟踪所有项目和获取的最后一页。当请求尚未获取的项目时,Observable 会使用下一页的项目进行扩展。这反过来会更新所有可用的项目 (itemsRead = itemsRead = [...itemsRead, ...res.items]
),因此新订阅者将在强制扩展 Observable 之前使用已经获取的存储在 itemsRead
中的项目。
使用 share
对此 Observable 没有任何作用,因为数据已经被实现共享。使用 shareReplay
会强制无限流,因此不建议这样做!
我希望有一个更“纯粹”的实现,所以如果你想出一个,请分享它!
感谢@Picci 的初步想法。