在 RxJs 中实现具有固定堆栈的排队系统

implement a queueing system with a fixed stack in RxJs

假设我想要一个队列,其中任何时候只有 3 个项目被异步处理,我该怎么做?

这就是我的意思: 如果我有一组项目要上传到后端,即上传一些人工制品到云存储,然后 create/update 一个文档到反映每个人工制品的 url,我不想:

  1. Async/Await 在下一个之前进行每个上传操作 - 因为这会很慢
  2. 同时发送所有内容 - 这可能导致写入热点或速率限制
  3. 做一个 promise.race - 这最终导致 (2)
  4. 做一个 promise.all - 如果有一个很长的 运行 上传,过程会变慢。

而我想做的是:

  1. 有一个所有上传的队列,比如使用 RxJs 创建方法,例如from(array-of-upload-items) 一次处理 3 件物品。
  2. 当一个项目离开堆栈即完成时,我们向队列添加一个新项目
  3. 确保在任一时刻,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。

我将如何使用 RxJs 来解决这个问题?

已编辑:27/June/2020

这是我的想法:

 const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so

      rxQueue
        .pipe(
          mergeMap((item) =>
            of(item).pipe(
              tap(async (item) => {
                  await Promise.race([
                      processUpload(item[0]),
                      processUpload(item[1]),
                      processUpload(item[2]),
                  ])
              }),
            ),
            3
          ),
        )
        .subscribe()

目标是确保在任何时候都处理(上传)3 个文件,以便在一个文件上传过程结束时添加另一个文件以保持堆栈为 3 个上传过程。同理,如果2个文件同时上传结束,则向堆栈中添加2个新文件,以此类推,直到文件数组中的所有文件都上传完毕。

使用 Subject 作为队列并使用 mergeMap 有一个并发参数可以限制最大并发数

const queue=new Subject()
queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)
queue.next(item)

我想你可以试试这个:

from(filesArray)
  .pipe(
    mergeMap(file => service.uploadFile(file), 3)
  )

这假设 service.uploadFile returns 一个 promise 或一个 observable。

假设您有 5 个文件,然后将从前 3 个创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。