如何在生成器函数中限制并行执行一些异步任务?

How can I execute some async tasks in parallel with limit in generator function?

我正在尝试并行执行一些异步任务,但同时运行的最大数量有限制任务。

有一个我想要实现的例子:

目前这个任务是运行一个接一个。它是这样实现的:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    //... nestedArgs assignment logic ...

    for (const id of dataItem.identifiers) {
      yield* idHandler(dataItem, id, args, nestedArgs);
    }
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

async function* idHandler(edsItem, researchId, args, nestedArgs) {
  ...
  let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
  yield oDocumentNameAttr.propset_Value("Document Name");
  ...
  // this function mutates some external data, making API calls and returns void
}

不幸的是,我无法对 cadesplugin.* 函数进行任何更改,但我可以在我的代码中使用任何外部库(或内置 Promise)。

我找到了一些方法(eachLimit and parallelLimit) in async library that might work for me and 显示了如何处理它。

但是还有两个问题我无法解决:

  1. 如何将主要参数传递给嵌套函数?
  2. Main 函数是一个 generator 函数,所以我仍然需要在 main 函数和嵌套函数中使用 yield 表达式

有一个 link 到 cadesplugin.* source code,您可以在其中找到 async_spawn(以及另一个 cadesplugin.*)函数在我的代码中。

这是我试过但没有成功的代码:

await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) { 
  //... nested function code 
});

它会导致 Object is not async iterable 错误。

另一次尝试:

let functionArray = [];
dataItem.identifiers.forEach(researchId => {
  functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);

它什么都不做。

我能否以某种方式解决这个问题,或者生成器函数不允许我这样做?

方钉圆孔

您不能使用异步迭代来解决这个问题。是for await .. of到运行串联的性质。 await 块,直到等待的承诺得到解决,循环才会继续。您需要更精确的控制级别来执行这些特定要求。

首先,我们有一个模拟长时间计算的模拟 myJob。这很可能是对您应用中某些 API 的网络请求 -

// any asynchronous task
const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

使用中定义的Pool,我们实例化Pool(size=4),其中size是运行-

的并发线程数
const pool = new Pool(4)

为了人体工学,我在Poolclass的基础上加了一个run方法,这样更容易换行和运行作业-

class Pool {
  constructor (size) ...
  open () ...
  deferNow () ...
  deferStacked () ...

  // added method
  async run (t) {
    const close = await this.open()
    return t().then(close)
  }
}

现在我们需要编写一个使用我们的 pool 到 运行 myJob 的效果。在这里您还将决定如何处理结果。请注意,承诺 必须 包装在一个 thunk 中,否则池无法控制它何时开始 -

async function myEffect(x) {
  // run the job with the pool
  const r = await pool.run(_ => myJob(x))

  // do something with the result
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)

  // return a value, if you want
  return r
}

现在 运行 通过在您的输入列表上映射 myEffect 一切。在我们的示例中 myEffect 我们 return r 这意味着在获取 所有 结果后结果也可用。这是可选的,但演示了程序如何知道一切何时完成 -

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

完整程序演示

在下面的功能演示中,我压缩了定义以便我们可以一次看到它们。 运行在自己的浏览器中验证结果的程序-

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  async run (t) { const close = await this.open(); return t().then(close) }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))

const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

async function myEffect(x) {
  const r = await pool.run(_ => myJob(x))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}
  
const pool = new Pool(4)

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

慢点

Pool 以上 运行s 并发作业尽快。你可能也对throttle感兴趣,原来post中也有介绍。我们可以使用 throttle 包装我们的作业,而不是让 Pool 更复杂,让调用者控制作业应该花费的 最小 时间 -

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

我们可以在myEffect中添加一个throttle。现在如果 myJob 运行s 非常快,在下一个作业是 运行 -

之前至少会经过 5 秒
async function myEffect(x) {
  const r = await pool.run(_ => throttle(myJob(x), 5000))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}

一般来说,申请应该比较好。

但是,如果您也沉迷于 cadesplugin.* generator 函数并且并不真正关心重量级外部库,那么这个答案可能也会有所帮助。

(如果你担心重量级的外部库,你仍然可以把这个答案和@Mulan 的答案混在一起)

异步任务 运行 可以简单地使用 bluebird library 中的 Promise.map 函数和双重使用 cadesplugin.async_spawn 函数来解决。

代码如下所示:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    // some extra logic before all of the tasks

    await Promise.map(dataItem.identifiers,
      (id) => cadesplugin.async_spawn(async function* (args) {
        // ...
        let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
        yield oDocumentNameAttr.propset_Value("Document Name");
        // ...
        // this function mutates some external data and making API calls
      }),
      {
        concurrency: 5 //Parallel tasks count
      });
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

魔法来自 async_spawn 函数,定义为:

function async_spawn(generatorFunction) {
  async function continuer(verb, arg) {
    let result;
    try {
      result = await generator[verb](arg);
    } catch (err) {
      return Promise.reject(err);
    }
    if (result.done) {
      return result.value;
    } else {
      return Promise.resolve(result.value).then(onFulfilled, onRejected);
    }
  }

  let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
  let onFulfilled = continuer.bind(continuer, "next");
  let onRejected = continuer.bind(continuer, "throw");
  return onFulfilled();
}

它可以在 yield 表达式上暂停 internal 生成器函数的执行,而不暂停 whole 生成器函数。