在 RxJs 中实现具有固定堆栈的排队系统
implement a queueing system with a fixed stack in RxJs
假设我想要一个队列,其中任何时候只有 3 个项目被异步处理,我该怎么做?
这就是我的意思: 如果我有一组项目要上传到后端,即上传一些人工制品到云存储,然后 create/update 一个文档到反映每个人工制品的 url,我不想:
- Async/Await 在下一个之前进行每个上传操作 - 因为这会很慢
- 同时发送所有内容 - 这可能导致写入热点或速率限制
- 做一个 promise.race - 这最终导致 (2)
- 做一个 promise.all - 如果有一个很长的 运行 上传,过程会变慢。
而我想做的是:
- 有一个所有上传的队列,比如使用 RxJs 创建方法,例如
from(array-of-upload-items)
一次处理 3 件物品。
- 当一个项目离开堆栈即完成时,我们向队列添加一个新项目
- 确保在任一时刻,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。
我将如何使用 RxJs 来解决这个问题?
已编辑:27/June/2020
这是我的想法:
const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so
rxQueue
.pipe(
mergeMap((item) =>
of(item).pipe(
tap(async (item) => {
await Promise.race([
processUpload(item[0]),
processUpload(item[1]),
processUpload(item[2]),
])
}),
),
3
),
)
.subscribe()
目标是确保在任何时候都处理(上传)3 个文件,以便在一个文件上传过程结束时添加另一个文件以保持堆栈为 3 个上传过程。同理,如果2个文件同时上传结束,则向堆栈中添加2个新文件,以此类推,直到文件数组中的所有文件都上传完毕。
使用 Subject
作为队列并使用 mergeMap
有一个并发参数可以限制最大并发数
const queue=new Subject()
queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)
queue.next(item)
我想你可以试试这个:
from(filesArray)
.pipe(
mergeMap(file => service.uploadFile(file), 3)
)
这假设 service.uploadFile
returns 一个 promise 或一个 observable。
假设您有 5 个文件,然后将从前 3 个创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。
假设我想要一个队列,其中任何时候只有 3 个项目被异步处理,我该怎么做?
这就是我的意思: 如果我有一组项目要上传到后端,即上传一些人工制品到云存储,然后 create/update 一个文档到反映每个人工制品的 url,我不想:
- Async/Await 在下一个之前进行每个上传操作 - 因为这会很慢
- 同时发送所有内容 - 这可能导致写入热点或速率限制
- 做一个 promise.race - 这最终导致 (2)
- 做一个 promise.all - 如果有一个很长的 运行 上传,过程会变慢。
而我想做的是:
- 有一个所有上传的队列,比如使用 RxJs 创建方法,例如
from(array-of-upload-items)
一次处理 3 件物品。 - 当一个项目离开堆栈即完成时,我们向队列添加一个新项目
- 确保在任一时刻,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。
我将如何使用 RxJs 来解决这个问题?
已编辑:27/June/2020
这是我的想法:
const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so
rxQueue
.pipe(
mergeMap((item) =>
of(item).pipe(
tap(async (item) => {
await Promise.race([
processUpload(item[0]),
processUpload(item[1]),
processUpload(item[2]),
])
}),
),
3
),
)
.subscribe()
目标是确保在任何时候都处理(上传)3 个文件,以便在一个文件上传过程结束时添加另一个文件以保持堆栈为 3 个上传过程。同理,如果2个文件同时上传结束,则向堆栈中添加2个新文件,以此类推,直到文件数组中的所有文件都上传完毕。
使用 Subject
作为队列并使用 mergeMap
有一个并发参数可以限制最大并发数
const queue=new Subject()
queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)
queue.next(item)
我想你可以试试这个:
from(filesArray)
.pipe(
mergeMap(file => service.uploadFile(file), 3)
)
这假设 service.uploadFile
returns 一个 promise 或一个 observable。
假设您有 5 个文件,然后将从前 3 个创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。