RxJS:批量请求和共享响应
RxJS: Batch requests and share response
让我们假设我有一个函数 fetchUser
,它将 userId
和 return 作为参数 用户 的可观察值。
因为我经常调用此方法,所以我想 批处理 ID 以执行一个具有多个 ID 的请求!
我的麻烦开始了...
如果不在 fetchUser
的不同调用之间共享一个 observable,我找不到解决方案。
import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userToFetch$ = new Subject<string>()
const fetchedUser$ = userToFetch$.pipe(
bufferTime(1000),
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
share(),
)
const fetchUser = (userId: string) => {
const observable = fetchedUser$.pipe(
map((users) => users.find((user) => user.id === userId)),
filter((user) => !!user),
take(1),
)
userToFetch$.next(userId)
return observable
}
但这很丑陋,而且有很多问题:
- 如果我在
bufferTime
的计时器结束之前取消订阅由 fetchUser
编辑的可观察对象 return,它不会阻止用户的获取。
- 如果我在批处理完成之前取消订阅由
fetchUser
编辑的所有可观察对象 return,它不会取消请求。
- 错误处理更复杂
- 等
更一般地说:我不知道如何使用 RxJS 解决需要 共享资源 的问题。很难找到 RxJS 的高级示例。
我不确定这是否是解决这个问题的最佳方法(至少它需要测试),但我会尽力解释我的观点。
我们有 2 个 queue
:用于待定和功能请求。
result
帮助 response/error 交付给订阅者。
某种基于某种时间表的工作人员从队列中取出任务来执行请求。
If i unsubscribe from the observable returned by fetchUser before the
timer of bufferTime is finished, it doesn't prevent the fetch of the
user.
取消订阅 fetchUser
将清理 request queue
,worker
将不执行任何操作。
If i unsubscribe from all the observables returned by fetchUser before
the fetch of the batch is finished, it doesn't cancel the request.
工人订阅until isNothingRemain$
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
tap(() => console.log('API_CALL', userIds)),
delay(200),
)
class Queue {
queue$ = new BehaviorSubject(new Map());
private get currentQueue() {
return new Map(this.queue$.getValue());
}
add(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + 1);
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
};
addMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + idmap.get(id));
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
}
remove(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
removeMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
has(id) {
return this.queue$.getValue().has(id);
}
asObservable() {
return this.queue$.asObservable();
}
}
class Result {
result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
select(id) {
return this.result$.pipe(
filter(({ ids }) => ids.has(id)),
switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
)
}
add({ isError, value, ids }) {
this.result$.next({ ids, isError, value });
}
clear(){
this.result$.next({ ids: new Map(), isError: null, value: null });
}
}
const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();
const fetchUser = (id: string) => {
return Observable.create(observer => {
queueToSend.add(id);
doRequest.next();
const subscription = result
.select(id)
.pipe(take(1))
.subscribe(observer);
// cleanup queue after got response or unsubscribe
return () => {
(queueToSend.has(id) ? queueToSend : queuePending).remove(id);
subscription.unsubscribe();
}
})
}
// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
auditTime(1000),
// clear outdated results
tap(()=>result.clear()),
withLatestFrom(queueToSend.asObservable()),
map(([_, queue]) => queue),
filter(ids => !!ids.size),
mergeMap(ids => {
// abort the request if it have no subscribers
const isNothingRemain$ = combineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
filter(hasSameKey => !hasSameKey)
)
// prevent to request the same ids if previous requst is not complete
queueToSend.removeMap(ids);
queuePending.addMap(ids);
return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
map(res => ({ isErorr: false, value: res, ids })),
takeUntil(isNothingRemain$),
catchError(error => of({ isError: true, value: error, ids }))
)
}),
).subscribe(res => result.add(res))
fetchUser('1').subscribe(console.log);
const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();
fetchUser('3').subscribe(console.log);
setTimeout(() => {
const subs1 = fetchUser('10').subscribe(console.log);
subs1.unsubscribe();
const subs2 = fetchUser('11').subscribe(console.log);
subs2.unsubscribe();
}, 2000)
setTimeout(() => {
const subs1 = fetchUser('20').subscribe(console.log);
subs1.unsubscribe();
const subs21 = fetchUser('20').subscribe(console.log);
const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)
// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}
我认为@Biggy 是对的。
这是我理解问题的方式以及你想要实现的目标
- 您的应用中有不同的地方您想要获取用户
- 你不想一直触发获取请求,而是你
想要缓冲它们并以一定的时间间隔发送它们,
假设 1 秒
- 您想取消某个缓冲区并避免那 1 秒
interval 触发获取一批用户的请求
- 同时,如果有人,我们称之为位置代码
X 已请求用户,几毫秒后有人
否则,即 位置 Y 的代码取消整批
请求,然后 位置 X 的代码必须接收某种
回答,比方说
null
- 更多,您可能希望能够请求获取用户然后更改
你的想法,如果在缓冲时间的间隔内,并且避免
要获取的这个用户(我不确定这真的是什么东西
你想要,但它似乎以某种方式从你的问题中出现
如果这一切都是真的,那么您可能需要某种排队机制,正如 Buggy 所建议的那样。
那么这种机制的实现可能会有很多。
仅供参考,我尝试使用以下答案创建通用批处理任务队列
@buggy & @picci :
import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"
export interface Task<TInput> {
uid: number
input: TInput
}
interface ErroredTask<TInput> extends Task<TInput> {
error: any
}
interface SucceededTask<TInput, TOutput> extends Task<TInput> {
output: TOutput
}
export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>
const taskErrored = <TInput, TOutput>(
taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error
type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>
export const createSimpleBatchedWorker = <TInput, TOutput>(
work: (inputs: TInput[]) => Observable<TOutput[]>,
workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
tasks: Array<Task<TInput>>,
) => work(
tasks.map((task) => task.input),
).pipe(
mergeMap((outputs) => from(tasks.map((task, index) => ({
...task,
output: outputs[index],
})))),
timeout(workTimeout),
catchError((error) => from(tasks.map((task) => ({
...task,
error,
})))),
)
export const createBatchedTaskQueue = <TInput, TOutput>(
worker: BatchedWorker<TInput, TOutput>,
concurrencyLimit: number = 1,
batchTimeout: number = 0,
maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
const taskSubject = new Subject<Task<TInput>>()
const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
const cancelTask = (task: Task<TInput>) => {
const cancelledUids = cancelTaskSubject.getValue()
const newCancelledUids = new Set(cancelledUids)
newCancelledUids.add(task.uid)
cancelTaskSubject.next(newCancelledUids)
}
const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
bufferTime(batchTimeout, undefined, maxBatchSize),
map((tasks) => {
const cancelledUids = cancelTaskSubject.getValue()
return tasks.filter((task) => !cancelledUids.has(task.uid))
}),
filter((tasks) => tasks.length > 0),
mergeMap(
(tasks) => worker(tasks).pipe(
takeUntil(cancelTaskSubject.pipe(
filter((uids) => {
for (const task of tasks) {
if (!uids.has(task.uid)) {
return false
}
}
return true
}),
)),
),
undefined,
concurrencyLimit,
),
share(),
)
let nextUid = 0
return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
concatMap((input) => new Observable<TOutput>((observer) => {
const task = {
uid: nextUid++,
input,
}
const subscription = output$.pipe(
filter((taskFinished) => taskFinished.uid === task.uid),
take(1),
map((taskFinished) => {
if (taskErrored(taskFinished)) {
throw taskFinished.error
}
return taskFinished.output
}),
).subscribe(observer)
subscription.add(
timer(0).subscribe(() => taskSubject.next(task)),
)
return () => {
subscription.unsubscribe()
cancelTask(task)
}
})),
)
}
以我们为例:
import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/components/rxjs/batched-task-queue"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userFetchQueue = createBatchedTaskQueue(
createSimpleBatchedWorker(
functionThatSimulateAFetch,
10000,
),
)
const fetchUser = (userId: string) => {
return from(userId).pipe(
userFetchQueue,
)
}
我愿意接受任何改进建议
你拥有的是好的,但与 RxJS 的一切一样,细节决定成败。
问题
-
switchMap
ing
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
这是你第一次出错的地方。通过在此处使用合并映射,您无法区分 "stream of requests" 和 "stream returned by a single request":
- 您几乎不可能取消订阅单个请求(取消它)
- 你让处理错误变得不可能
- 如果你的内部 observable 发射不止一次,它就会崩溃。
相反,您想要的是通过正常的 map
(产生可观察的可观察)发出单个 BatchEvent
s,以及 switchMap
/mergeMap
那些过滤后。
- 订阅前创建可观察和发射时的副作用
userToFetch$.next(userId)
return observable
不要这样做。可观察对象本身实际上并没有做任何事情。它是 "blueprint" 当您订阅它时 会发生一系列操作。通过这样做,您只会在可观察创建上创建一个批处理操作,但如果您获得多个或延迟订阅,您就会搞砸了。
相反,您想从 defer
创建一个可观察对象,在每次订阅时发送到 userToFetch$
。
即便如此,您还是希望在 发送到 userToFetch
之前订阅您的 observable : , 事件将丢失。您可以在类似延迟的可观察对象中执行此操作。
解决方案
简短,与您的代码差别不大,但结构如下。
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The incoming keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
这完全符合您的要求,包括:
- 错误会传播给批处理调用的接收者,但不会传播给其他人
- 如果每个人都取消订阅一个批次,则 observable 被取消
- 如果每个人都在请求被触发之前取消订阅一个批次,那么它永远不会被触发
- observable 的行为类似于 HttpClient:订阅 observable 会触发新的(批处理的)数据请求。来电者可以自由使用管道
shareReplay
或其他任何方式。所以没有惊喜。
这是一个有效的 stackblitz Angular 演示:https://stackblitz.com/edit/angular-rxjs-batch-request
特别注意 "toggle" 显示时的行为:您会注意到重新订阅现有的可观察对象将触发新的批处理请求,并且这些请求将取消(或完全不触发)如果你重新切换的速度足够快。
用例
在我们的项目中,我们将其用于 Angular 表,其中每一行都需要单独获取额外的数据来呈现。这使我们能够:
- 分块 "single page" 的所有请求,不需要任何分页的特殊知识
- 如果用户快速分页,可能一次获取多个页面
- 重新使用现有结果,即使页面大小发生变化
限制
我不会在其中添加分块或速率限制。因为 source observable 是一个愚蠢的 bufferTime
你 运行 进入问题:
- "chunking" 将在重复数据删除之前发生。因此,如果您有 100 个针对单个 userId 的请求,您最终将只用 1 个元素触发多个请求
- 如果您设置速率限制,您将无法检查您的队列。所以你最终可能会得到一个包含多个相同请求的很长的队列。
虽然这是一个悲观的观点。修复它意味着要完全使用有状态的 queue/batch 机制,这要复杂一个数量级。
让我们假设我有一个函数 fetchUser
,它将 userId
和 return 作为参数 用户 的可观察值。
因为我经常调用此方法,所以我想 批处理 ID 以执行一个具有多个 ID 的请求!
我的麻烦开始了...
如果不在 fetchUser
的不同调用之间共享一个 observable,我找不到解决方案。
import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userToFetch$ = new Subject<string>()
const fetchedUser$ = userToFetch$.pipe(
bufferTime(1000),
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
share(),
)
const fetchUser = (userId: string) => {
const observable = fetchedUser$.pipe(
map((users) => users.find((user) => user.id === userId)),
filter((user) => !!user),
take(1),
)
userToFetch$.next(userId)
return observable
}
但这很丑陋,而且有很多问题:
- 如果我在
bufferTime
的计时器结束之前取消订阅由fetchUser
编辑的可观察对象 return,它不会阻止用户的获取。 - 如果我在批处理完成之前取消订阅由
fetchUser
编辑的所有可观察对象 return,它不会取消请求。 - 错误处理更复杂
- 等
更一般地说:我不知道如何使用 RxJS 解决需要 共享资源 的问题。很难找到 RxJS 的高级示例。
我不确定这是否是解决这个问题的最佳方法(至少它需要测试),但我会尽力解释我的观点。
我们有 2 个 queue
:用于待定和功能请求。
result
帮助 response/error 交付给订阅者。
某种基于某种时间表的工作人员从队列中取出任务来执行请求。
If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.
取消订阅 fetchUser
将清理 request queue
,worker
将不执行任何操作。
If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.
工人订阅until isNothingRemain$
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
tap(() => console.log('API_CALL', userIds)),
delay(200),
)
class Queue {
queue$ = new BehaviorSubject(new Map());
private get currentQueue() {
return new Map(this.queue$.getValue());
}
add(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + 1);
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
};
addMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + idmap.get(id));
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
}
remove(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
removeMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
has(id) {
return this.queue$.getValue().has(id);
}
asObservable() {
return this.queue$.asObservable();
}
}
class Result {
result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
select(id) {
return this.result$.pipe(
filter(({ ids }) => ids.has(id)),
switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
)
}
add({ isError, value, ids }) {
this.result$.next({ ids, isError, value });
}
clear(){
this.result$.next({ ids: new Map(), isError: null, value: null });
}
}
const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();
const fetchUser = (id: string) => {
return Observable.create(observer => {
queueToSend.add(id);
doRequest.next();
const subscription = result
.select(id)
.pipe(take(1))
.subscribe(observer);
// cleanup queue after got response or unsubscribe
return () => {
(queueToSend.has(id) ? queueToSend : queuePending).remove(id);
subscription.unsubscribe();
}
})
}
// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
auditTime(1000),
// clear outdated results
tap(()=>result.clear()),
withLatestFrom(queueToSend.asObservable()),
map(([_, queue]) => queue),
filter(ids => !!ids.size),
mergeMap(ids => {
// abort the request if it have no subscribers
const isNothingRemain$ = combineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
filter(hasSameKey => !hasSameKey)
)
// prevent to request the same ids if previous requst is not complete
queueToSend.removeMap(ids);
queuePending.addMap(ids);
return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
map(res => ({ isErorr: false, value: res, ids })),
takeUntil(isNothingRemain$),
catchError(error => of({ isError: true, value: error, ids }))
)
}),
).subscribe(res => result.add(res))
fetchUser('1').subscribe(console.log);
const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();
fetchUser('3').subscribe(console.log);
setTimeout(() => {
const subs1 = fetchUser('10').subscribe(console.log);
subs1.unsubscribe();
const subs2 = fetchUser('11').subscribe(console.log);
subs2.unsubscribe();
}, 2000)
setTimeout(() => {
const subs1 = fetchUser('20').subscribe(console.log);
subs1.unsubscribe();
const subs21 = fetchUser('20').subscribe(console.log);
const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)
// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}
我认为@Biggy 是对的。
这是我理解问题的方式以及你想要实现的目标
- 您的应用中有不同的地方您想要获取用户
- 你不想一直触发获取请求,而是你 想要缓冲它们并以一定的时间间隔发送它们, 假设 1 秒
- 您想取消某个缓冲区并避免那 1 秒 interval 触发获取一批用户的请求
- 同时,如果有人,我们称之为位置代码
X 已请求用户,几毫秒后有人
否则,即 位置 Y 的代码取消整批
请求,然后 位置 X 的代码必须接收某种
回答,比方说
null
- 更多,您可能希望能够请求获取用户然后更改 你的想法,如果在缓冲时间的间隔内,并且避免 要获取的这个用户(我不确定这真的是什么东西 你想要,但它似乎以某种方式从你的问题中出现
如果这一切都是真的,那么您可能需要某种排队机制,正如 Buggy 所建议的那样。
那么这种机制的实现可能会有很多。
仅供参考,我尝试使用以下答案创建通用批处理任务队列 @buggy & @picci :
import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"
export interface Task<TInput> {
uid: number
input: TInput
}
interface ErroredTask<TInput> extends Task<TInput> {
error: any
}
interface SucceededTask<TInput, TOutput> extends Task<TInput> {
output: TOutput
}
export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>
const taskErrored = <TInput, TOutput>(
taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error
type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>
export const createSimpleBatchedWorker = <TInput, TOutput>(
work: (inputs: TInput[]) => Observable<TOutput[]>,
workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
tasks: Array<Task<TInput>>,
) => work(
tasks.map((task) => task.input),
).pipe(
mergeMap((outputs) => from(tasks.map((task, index) => ({
...task,
output: outputs[index],
})))),
timeout(workTimeout),
catchError((error) => from(tasks.map((task) => ({
...task,
error,
})))),
)
export const createBatchedTaskQueue = <TInput, TOutput>(
worker: BatchedWorker<TInput, TOutput>,
concurrencyLimit: number = 1,
batchTimeout: number = 0,
maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
const taskSubject = new Subject<Task<TInput>>()
const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
const cancelTask = (task: Task<TInput>) => {
const cancelledUids = cancelTaskSubject.getValue()
const newCancelledUids = new Set(cancelledUids)
newCancelledUids.add(task.uid)
cancelTaskSubject.next(newCancelledUids)
}
const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
bufferTime(batchTimeout, undefined, maxBatchSize),
map((tasks) => {
const cancelledUids = cancelTaskSubject.getValue()
return tasks.filter((task) => !cancelledUids.has(task.uid))
}),
filter((tasks) => tasks.length > 0),
mergeMap(
(tasks) => worker(tasks).pipe(
takeUntil(cancelTaskSubject.pipe(
filter((uids) => {
for (const task of tasks) {
if (!uids.has(task.uid)) {
return false
}
}
return true
}),
)),
),
undefined,
concurrencyLimit,
),
share(),
)
let nextUid = 0
return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
concatMap((input) => new Observable<TOutput>((observer) => {
const task = {
uid: nextUid++,
input,
}
const subscription = output$.pipe(
filter((taskFinished) => taskFinished.uid === task.uid),
take(1),
map((taskFinished) => {
if (taskErrored(taskFinished)) {
throw taskFinished.error
}
return taskFinished.output
}),
).subscribe(observer)
subscription.add(
timer(0).subscribe(() => taskSubject.next(task)),
)
return () => {
subscription.unsubscribe()
cancelTask(task)
}
})),
)
}
以我们为例:
import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/components/rxjs/batched-task-queue"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userFetchQueue = createBatchedTaskQueue(
createSimpleBatchedWorker(
functionThatSimulateAFetch,
10000,
),
)
const fetchUser = (userId: string) => {
return from(userId).pipe(
userFetchQueue,
)
}
我愿意接受任何改进建议
你拥有的是好的,但与 RxJS 的一切一样,细节决定成败。
问题
-
switchMap
ing
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
这是你第一次出错的地方。通过在此处使用合并映射,您无法区分 "stream of requests" 和 "stream returned by a single request":
- 您几乎不可能取消订阅单个请求(取消它)
- 你让处理错误变得不可能
- 如果你的内部 observable 发射不止一次,它就会崩溃。
相反,您想要的是通过正常的 map
(产生可观察的可观察)发出单个 BatchEvent
s,以及 switchMap
/mergeMap
那些过滤后。
- 订阅前创建可观察和发射时的副作用
userToFetch$.next(userId)
return observable
不要这样做。可观察对象本身实际上并没有做任何事情。它是 "blueprint" 当您订阅它时 会发生一系列操作。通过这样做,您只会在可观察创建上创建一个批处理操作,但如果您获得多个或延迟订阅,您就会搞砸了。
相反,您想从 defer
创建一个可观察对象,在每次订阅时发送到 userToFetch$
。
即便如此,您还是希望在 发送到 userToFetch
之前订阅您的 observable : , 事件将丢失。您可以在类似延迟的可观察对象中执行此操作。
解决方案
简短,与您的代码差别不大,但结构如下。
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The incoming keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
这完全符合您的要求,包括:
- 错误会传播给批处理调用的接收者,但不会传播给其他人
- 如果每个人都取消订阅一个批次,则 observable 被取消
- 如果每个人都在请求被触发之前取消订阅一个批次,那么它永远不会被触发
- observable 的行为类似于 HttpClient:订阅 observable 会触发新的(批处理的)数据请求。来电者可以自由使用管道
shareReplay
或其他任何方式。所以没有惊喜。
这是一个有效的 stackblitz Angular 演示:https://stackblitz.com/edit/angular-rxjs-batch-request
特别注意 "toggle" 显示时的行为:您会注意到重新订阅现有的可观察对象将触发新的批处理请求,并且这些请求将取消(或完全不触发)如果你重新切换的速度足够快。
用例
在我们的项目中,我们将其用于 Angular 表,其中每一行都需要单独获取额外的数据来呈现。这使我们能够:
- 分块 "single page" 的所有请求,不需要任何分页的特殊知识
- 如果用户快速分页,可能一次获取多个页面
- 重新使用现有结果,即使页面大小发生变化
限制
我不会在其中添加分块或速率限制。因为 source observable 是一个愚蠢的 bufferTime
你 运行 进入问题:
- "chunking" 将在重复数据删除之前发生。因此,如果您有 100 个针对单个 userId 的请求,您最终将只用 1 个元素触发多个请求
- 如果您设置速率限制,您将无法检查您的队列。所以你最终可能会得到一个包含多个相同请求的很长的队列。
虽然这是一个悲观的观点。修复它意味着要完全使用有状态的 queue/batch 机制,这要复杂一个数量级。