RxJS:批量请求和共享响应

RxJS: Batch requests and share response

让我们假设我有一个函数 fetchUser,它将 userId 和 return 作为参数 用户 的可观察值。

因为我经常调用此方法,所以我想 批处理 ID 以执行一个具有多个 ID 的请求!

我的麻烦开始了...

如果不在 fetchUser 的不同调用之间共享一个 observable,我找不到解决方案。

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

但这很丑陋,而且有很多问题:

更一般地说:我不知道如何使用 RxJS 解决需要 共享资源 的问题。很难找到 RxJS 的高级示例。

我不确定这是否是解决这个问题的最佳方法(至少它需要测试),但我会尽力解释我的观点。

我们有 2 个 queue:用于待定和功能请求。
result 帮助 response/error 交付给订阅者。
某种基于某种时间表的工作人员从队列中取出任务来执行请求。

If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.

取消订阅 fetchUser 将清理 request queueworker 将不执行任何操作。

If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.

工人订阅until isNothingRemain$

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(

  map((userId) => ({ id: userId, name: "George" })),
  toArray(),
  tap(() => console.log('API_CALL', userIds)),
  delay(200),
)

class Queue {
  queue$ = new BehaviorSubject(new Map());

  private get currentQueue() {
    return new Map(this.queue$.getValue());
  }

  add(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.set(id, (acc.get(id) || 0) + 1);
      return acc;
    }, this.currentQueue);
    this.queue$.next(newMap);
  };

  addMap(idmap: Map<any, any>) {

    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.set(id, (acc.get(id) || 0) + idmap.get(id));
        return acc;
      }, this.currentQueue);
    this.queue$.next(newMap);
  }

  remove(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
      return acc;
    }, this.currentQueue)
    this.queue$.next(newMap);
  };

  removeMap(idmap: Map<any, any>) {
    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
        return acc;
      }, this.currentQueue)
    this.queue$.next(newMap);
  };

  has(id) {
    return this.queue$.getValue().has(id);
  }

  asObservable() {
    return this.queue$.asObservable();
  }
}

class Result {
  result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
  select(id) {
    return this.result$.pipe(
      filter(({ ids }) => ids.has(id)),
      switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
    )
  }
  add({ isError, value, ids }) {
    this.result$.next({ ids, isError, value });
  }

  clear(){
    this.result$.next({ ids: new Map(), isError: null, value: null });
  }
}

const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();

const fetchUser = (id: string) => {
  return Observable.create(observer => {
    queueToSend.add(id);
    doRequest.next();

    const subscription = result
      .select(id)
      .pipe(take(1))
      .subscribe(observer);

    // cleanup queue after got response or unsubscribe
    return () => {
      (queueToSend.has(id) ? queueToSend : queuePending).remove(id);
      subscription.unsubscribe();
    }
  })
}


// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
  auditTime(1000),
  // clear outdated results
  tap(()=>result.clear()),
  withLatestFrom(queueToSend.asObservable()),
  map(([_, queue]) => queue),
  filter(ids => !!ids.size),
  mergeMap(ids => {
    // abort the request if it have no subscribers
    const isNothingRemain$ = combineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
      map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
      filter(hasSameKey => !hasSameKey)
    )

    // prevent to request the same ids if previous requst is not complete
    queueToSend.removeMap(ids);
    queuePending.addMap(ids);
    return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
      map(res => ({ isErorr: false, value: res, ids })),
      takeUntil(isNothingRemain$),
      catchError(error => of({ isError: true, value: error, ids }))
    )
  }),
).subscribe(res => result.add(res))




fetchUser('1').subscribe(console.log);

const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();

fetchUser('3').subscribe(console.log);



setTimeout(() => {
  const subs1 = fetchUser('10').subscribe(console.log);
  subs1.unsubscribe();

  const subs2 = fetchUser('11').subscribe(console.log);
  subs2.unsubscribe();
}, 2000)


setTimeout(() => {
  const subs1 = fetchUser('20').subscribe(console.log);
  subs1.unsubscribe();

  const subs21 = fetchUser('20').subscribe(console.log);
  const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)


// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}

stackblitz example

我认为@Biggy 是对的。

这是我理解问题的方式以及你想要实现的目标

  1. 您的应用中有不同的地方您想要获取用户
  2. 你不想一直触发获取请求,而是你 想要缓冲它们并以一定的时间间隔发送它们, 假设 1 秒
  3. 您想取消某个缓冲区并避免那 1 秒 interval 触发获取一批用户的请求
  4. 同时,如果有人,我们称之为位置代码 X 已请求用户,几毫秒后有人 否则,即 位置 Y 的代码取消整批 请求,然后 位置 X 的代码必须接收某种 回答,比方说 null
  5. 更多,您可能希望能够请求获取用户然后更改 你的想法,如果在缓冲时间的间隔内,并且避免 要获取的这个用户(我不确定这真的是什么东西 你想要,但它似乎以某种方式从你的问题中出现

如果这一切都是真的,那么您可能需要某种排队机制,正如 Buggy 所建议的那样。

那么这种机制的实现可能会有很多。

仅供参考,我尝试使用以下答案创建通用批处理任务队列 @buggy & @picci :

import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"

export interface Task<TInput> {
    uid: number
    input: TInput
}

interface ErroredTask<TInput> extends Task<TInput> {
    error: any
}

interface SucceededTask<TInput, TOutput> extends Task<TInput> {
    output: TOutput
}

export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>

const taskErrored = <TInput, TOutput>(
    taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error

type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>

export const createSimpleBatchedWorker = <TInput, TOutput>(
    work: (inputs: TInput[]) => Observable<TOutput[]>,
    workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
    tasks: Array<Task<TInput>>,
) => work(
    tasks.map((task) => task.input),
).pipe(
    mergeMap((outputs) => from(tasks.map((task, index) => ({
        ...task,
        output: outputs[index],
    })))),
    timeout(workTimeout),
    catchError((error) => from(tasks.map((task) => ({
        ...task,
        error,
    })))),
)

export const createBatchedTaskQueue = <TInput, TOutput>(
    worker: BatchedWorker<TInput, TOutput>,
    concurrencyLimit: number = 1,
    batchTimeout: number = 0,
    maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
    const taskSubject = new Subject<Task<TInput>>()
    const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
    const cancelTask = (task: Task<TInput>) => {
        const cancelledUids = cancelTaskSubject.getValue()
        const newCancelledUids = new Set(cancelledUids)
        newCancelledUids.add(task.uid)
        cancelTaskSubject.next(newCancelledUids)
    }
    const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
        bufferTime(batchTimeout, undefined, maxBatchSize),
        map((tasks) => {
          const cancelledUids = cancelTaskSubject.getValue()
          return tasks.filter((task) => !cancelledUids.has(task.uid))
        }),
        filter((tasks) => tasks.length > 0),
        mergeMap(
            (tasks) => worker(tasks).pipe(
                takeUntil(cancelTaskSubject.pipe(
                    filter((uids) => {
                        for (const task of tasks) {
                            if (!uids.has(task.uid)) {
                                return false
                            }
                        }
                        return true
                    }),
                )),
            ),
            undefined,
            concurrencyLimit,
        ),
        share(),
    )
    let nextUid = 0
    return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
        concatMap((input) => new Observable<TOutput>((observer) => {
            const task = {
                uid: nextUid++,
                input,
            }
            const subscription = output$.pipe(
                filter((taskFinished) => taskFinished.uid === task.uid),
                take(1),
                map((taskFinished) => {
                    if (taskErrored(taskFinished)) {
                        throw taskFinished.error
                    }
                    return taskFinished.output
                }),
            ).subscribe(observer)
            subscription.add(
                timer(0).subscribe(() => taskSubject.next(task)),
            )
            return () => {
                subscription.unsubscribe()
                cancelTask(task)
            }
        })),
    )
}

以我们为例:

import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/components/rxjs/batched-task-queue"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userFetchQueue = createBatchedTaskQueue(
    createSimpleBatchedWorker(
        functionThatSimulateAFetch,
        10000,
    ),
)

const fetchUser = (userId: string) => {
    return from(userId).pipe(
        userFetchQueue,
    )
}

我愿意接受任何改进建议

你拥有的是好的,但与 RxJS 的一切一样,细节决定成败。

问题

  1. switchMaping
        mergeMap((userIds) => functionThatSimulateAFetch(userIds)),

这是你第一次出错的地方。通过在此处使用合并映射,您无法区分 "stream of requests" 和 "stream returned by a single request":

  • 您几乎不可能取消订阅单个请求(取消它)
  • 你让处理错误变得不可能
  • 如果你的内部 observable 发射不止一次,它就会崩溃。

相反,您想要的是通过正常的 map(产生可观察的可观察)发出单个 BatchEvents,以及 switchMap/mergeMap 那些过滤后

  1. 订阅前创建可观察和发射时的副作用
    userToFetch$.next(userId)
    return observable

不要这样做。可观察对象本身实际上并没有做任何事情。它是 "blueprint" 当您订阅它时 会发生一系列操作。通过这样做,您只会在可观察创建上创建一个批处理操作,但如果您获得多个或延迟订阅,您就会搞砸了。

相反,您想从 defer 创建一个可观察对象,在每次订阅时发送到 userToFetch$

即便如此,您还是希望在 发送到 userToFetch 之前订阅您的 observable : , 事件将丢失。您可以在类似延迟的可观察对象中执行此操作。

解决方案

简短,与您的代码差别不大,但结构如下。

const BUFFER_TIME = 1000;

type BatchEvent = { keys: Set<string>, values: Observable<Users> };

/** The incoming keys */
const keySubject = new Subject<string>();

const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
  this.keySubject.asObservable().pipe(
    bufferTime(BUFFER_TIME),
    map(keys => this.fetchBatch(keys)),
    share(),
  );

/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
  console.log("Creating observable for:", userId);
  // The money observable. See "defer":
  // triggers a new subject event on subscription
  const observable = new Observable<BatchEvent>(observer => {
    this.requests.subscribe(observer);
    // Emit *after* the subscription
    this.keySubject.next(userId);
  });
  return observable.pipe(
    first(v => v.keys.has(userId)),
    // There is only 1 item, so any *Map will do here
    switchMap(v => v.values),
    map(v => v[userId]),
  );
}

function fetchBatch(args: string[]): BatchEvent {
  const keys = new Set(args); // Do not batch duplicates
  const values = this.userService.get(Array.from(keys)).pipe(
    share(),
  );
  return { keys, values };
}

这完全符合您的要求,包括:

  • 错误会传播给批处理调用的接收者,但不会传播给其他人
  • 如果每个人都取消订阅一个批次,则 observable 被取消
  • 如果每个人都在请求被触发之前取消订阅一个批次,那么它永远不会被触发
  • observable 的行为类似于 HttpClient:订阅 observable 会触发新的(批处理的)数据请求。来电者可以自由使用管道 shareReplay 或其他任何方式。所以没有惊喜。

这是一个有效的 stackblitz Angular 演示:https://stackblitz.com/edit/angular-rxjs-batch-request

特别注意 "toggle" 显示时的行为:您会注意到重新订阅现有的可观察对象将触发新的批处理请求,并且这些请求将取消(或完全不触发)如果你重新切换的速度足够快。

用例

在我们的项目中,我们将其用于 Angular 表,其中每一行都需要单独获取额外的数据来呈现。这使我们能够:

  • 分块 "single page" 的所有请求,不需要任何分页的特殊知识
  • 如果用户快速分页,可能一次获取多个页面
  • 重新使用现有结果,即使页面大小发生变化

限制

我不会在其中添加分块或速率限制。因为 source observable 是一个愚蠢的 bufferTime 你 运行 进入问题:

  • "chunking" 将在重复数据删除之前发生。因此,如果您有 100 个针对单个 userId 的请求,您最终将只用 1 个元素触发多个请求
  • 如果您设置速率限制,您将无法检查您的队列。所以你最终可能会得到一个包含多个相同请求的很长的队列。

虽然这是一个悲观的观点。修复它意味着要完全使用有状态的 queue/batch 机制,这要复杂一个数量级。