如何在生成器函数中限制并行执行一些异步任务?
How can I execute some async tasks in parallel with limit in generator function?
我正在尝试并行执行一些异步任务,但同时运行的最大数量有限制任务。
有一个我想要实现的例子:
目前这个任务是运行一个接一个。它是这样实现的:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
//... nestedArgs assignment logic ...
for (const id of dataItem.identifiers) {
yield* idHandler(dataItem, id, args, nestedArgs);
}
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
async function* idHandler(edsItem, researchId, args, nestedArgs) {
...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
...
// this function mutates some external data, making API calls and returns void
}
不幸的是,我无法对 cadesplugin.*
函数进行任何更改,但我可以在我的代码中使用任何外部库(或内置 Promise
)。
我找到了一些方法(eachLimit and parallelLimit) in async library that might work for me and 显示了如何处理它。
但是还有两个问题我无法解决:
- 如何将主要参数传递给嵌套函数?
- Main 函数是一个 generator 函数,所以我仍然需要在 main 函数和嵌套函数中使用 yield 表达式
有一个 link 到 cadesplugin.* source code,您可以在其中找到 async_spawn(以及另一个 cadesplugin.*
)函数在我的代码中。
这是我试过但没有成功的代码:
await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) {
//... nested function code
});
它会导致 Object is not async iterable 错误。
另一次尝试:
let functionArray = [];
dataItem.identifiers.forEach(researchId => {
functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);
它什么都不做。
我能否以某种方式解决这个问题,或者生成器函数不允许我这样做?
方钉圆孔
您不能使用异步迭代来解决这个问题。是for await .. of
到运行串联的性质。 await
块,直到等待的承诺得到解决,循环才会继续。您需要更精确的控制级别来执行这些特定要求。
首先,我们有一个模拟长时间计算的模拟 myJob
。这很可能是对您应用中某些 API 的网络请求 -
// any asynchronous task
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
使用中定义的Pool
,我们实例化Pool(size=4)
,其中size
是运行-
的并发线程数
const pool = new Pool(4)
为了人体工学,我在Pool
class的基础上加了一个run
方法,这样更容易换行和运行作业-
class Pool {
constructor (size) ...
open () ...
deferNow () ...
deferStacked () ...
// added method
async run (t) {
const close = await this.open()
return t().then(close)
}
}
现在我们需要编写一个使用我们的 pool
到 运行 myJob
的效果。在这里您还将决定如何处理结果。请注意,承诺 必须 包装在一个 thunk 中,否则池无法控制它何时开始 -
async function myEffect(x) {
// run the job with the pool
const r = await pool.run(_ => myJob(x))
// do something with the result
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
// return a value, if you want
return r
}
现在 运行 通过在您的输入列表上映射 myEffect
一切。在我们的示例中 myEffect
我们 return r
这意味着在获取 所有 结果后结果也可用。这是可选的,但演示了程序如何知道一切何时完成 -
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
完整程序演示
在下面的功能演示中,我压缩了定义以便我们可以一次看到它们。 运行在自己的浏览器中验证结果的程序-
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
async run (t) { const close = await this.open(); return t().then(close) }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
async function myEffect(x) {
const r = await pool.run(_ => myJob(x))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
const pool = new Pool(4)
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
慢点
Pool
以上 运行s 并发作业尽快。你可能也对throttle
感兴趣,原来post中也有介绍。我们可以使用 throttle
包装我们的作业,而不是让 Pool
更复杂,让调用者控制作业应该花费的 最小 时间 -
const throttle = (p, ms) =>
Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
我们可以在myEffect
中添加一个throttle
。现在如果 myJob
运行s 非常快,在下一个作业是 运行 -
之前至少会经过 5 秒
async function myEffect(x) {
const r = await pool.run(_ => throttle(myJob(x), 5000))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
一般来说,申请应该比较好。
但是,如果您也沉迷于 cadesplugin.*
generator 函数并且并不真正关心重量级外部库,那么这个答案可能也会有所帮助。
(如果你担心重量级的外部库,你仍然可以把这个答案和@Mulan 的答案混在一起)
异步任务 运行 可以简单地使用 bluebird library 中的 Promise.map
函数和双重使用 cadesplugin.async_spawn
函数来解决。
代码如下所示:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
// some extra logic before all of the tasks
await Promise.map(dataItem.identifiers,
(id) => cadesplugin.async_spawn(async function* (args) {
// ...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
// ...
// this function mutates some external data and making API calls
}),
{
concurrency: 5 //Parallel tasks count
});
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
魔法来自 async_spawn
函数,定义为:
function async_spawn(generatorFunction) {
async function continuer(verb, arg) {
let result;
try {
result = await generator[verb](arg);
} catch (err) {
return Promise.reject(err);
}
if (result.done) {
return result.value;
} else {
return Promise.resolve(result.value).then(onFulfilled, onRejected);
}
}
let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
let onFulfilled = continuer.bind(continuer, "next");
let onRejected = continuer.bind(continuer, "throw");
return onFulfilled();
}
它可以在 yield 表达式上暂停 internal 生成器函数的执行,而不暂停 whole 生成器函数。
我正在尝试并行执行一些异步任务,但同时运行的最大数量有限制任务。
有一个我想要实现的例子:
目前这个任务是运行一个接一个。它是这样实现的:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
//... nestedArgs assignment logic ...
for (const id of dataItem.identifiers) {
yield* idHandler(dataItem, id, args, nestedArgs);
}
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
async function* idHandler(edsItem, researchId, args, nestedArgs) {
...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
...
// this function mutates some external data, making API calls and returns void
}
不幸的是,我无法对 cadesplugin.*
函数进行任何更改,但我可以在我的代码中使用任何外部库(或内置 Promise
)。
我找到了一些方法(eachLimit and parallelLimit) in async library that might work for me and
但是还有两个问题我无法解决:
- 如何将主要参数传递给嵌套函数?
- Main 函数是一个 generator 函数,所以我仍然需要在 main 函数和嵌套函数中使用 yield 表达式
有一个 link 到 cadesplugin.* source code,您可以在其中找到 async_spawn(以及另一个 cadesplugin.*
)函数在我的代码中。
这是我试过但没有成功的代码:
await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) {
//... nested function code
});
它会导致 Object is not async iterable 错误。
另一次尝试:
let functionArray = [];
dataItem.identifiers.forEach(researchId => {
functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);
它什么都不做。
我能否以某种方式解决这个问题,或者生成器函数不允许我这样做?
方钉圆孔
您不能使用异步迭代来解决这个问题。是for await .. of
到运行串联的性质。 await
块,直到等待的承诺得到解决,循环才会继续。您需要更精确的控制级别来执行这些特定要求。
首先,我们有一个模拟长时间计算的模拟 myJob
。这很可能是对您应用中某些 API 的网络请求 -
// any asynchronous task
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
使用Pool
,我们实例化Pool(size=4)
,其中size
是运行-
const pool = new Pool(4)
为了人体工学,我在Pool
class的基础上加了一个run
方法,这样更容易换行和运行作业-
class Pool {
constructor (size) ...
open () ...
deferNow () ...
deferStacked () ...
// added method
async run (t) {
const close = await this.open()
return t().then(close)
}
}
现在我们需要编写一个使用我们的 pool
到 运行 myJob
的效果。在这里您还将决定如何处理结果。请注意,承诺 必须 包装在一个 thunk 中,否则池无法控制它何时开始 -
async function myEffect(x) {
// run the job with the pool
const r = await pool.run(_ => myJob(x))
// do something with the result
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
// return a value, if you want
return r
}
现在 运行 通过在您的输入列表上映射 myEffect
一切。在我们的示例中 myEffect
我们 return r
这意味着在获取 所有 结果后结果也可用。这是可选的,但演示了程序如何知道一切何时完成 -
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
完整程序演示
在下面的功能演示中,我压缩了定义以便我们可以一次看到它们。 运行在自己的浏览器中验证结果的程序-
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
async run (t) { const close = await this.open(); return t().then(close) }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
async function myEffect(x) {
const r = await pool.run(_ => myJob(x))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
const pool = new Pool(4)
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
慢点
Pool
以上 运行s 并发作业尽快。你可能也对throttle
感兴趣,原来post中也有介绍。我们可以使用 throttle
包装我们的作业,而不是让 Pool
更复杂,让调用者控制作业应该花费的 最小 时间 -
const throttle = (p, ms) =>
Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
我们可以在myEffect
中添加一个throttle
。现在如果 myJob
运行s 非常快,在下一个作业是 运行 -
async function myEffect(x) {
const r = await pool.run(_ => throttle(myJob(x), 5000))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
一般来说,申请
但是,如果您也沉迷于 cadesplugin.*
generator 函数并且并不真正关心重量级外部库,那么这个答案可能也会有所帮助。
(如果你担心重量级的外部库,你仍然可以把这个答案和@Mulan 的答案混在一起)
异步任务 运行 可以简单地使用 bluebird library 中的 Promise.map
函数和双重使用 cadesplugin.async_spawn
函数来解决。
代码如下所示:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
// some extra logic before all of the tasks
await Promise.map(dataItem.identifiers,
(id) => cadesplugin.async_spawn(async function* (args) {
// ...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
// ...
// this function mutates some external data and making API calls
}),
{
concurrency: 5 //Parallel tasks count
});
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
魔法来自 async_spawn
函数,定义为:
function async_spawn(generatorFunction) {
async function continuer(verb, arg) {
let result;
try {
result = await generator[verb](arg);
} catch (err) {
return Promise.reject(err);
}
if (result.done) {
return result.value;
} else {
return Promise.resolve(result.value).then(onFulfilled, onRejected);
}
}
let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
let onFulfilled = continuer.bind(continuer, "next");
let onRejected = continuer.bind(continuer, "throw");
return onFulfilled();
}
它可以在 yield 表达式上暂停 internal 生成器函数的执行,而不暂停 whole 生成器函数。