异步生成器在解决时产生承诺结果

async generator yielding promise results as they are resolved

假设我想同时获取 10 个 url, 并处理收到的回复 (可能与订单顺序不同 它们出现在原始列表中)。 忽略拒绝的可能性,一种方法是简单地附加一个“then”回调 每个承诺,然后等待他们全部完成 使用 Promise.all().

const fetch_promises = [
  fetch("https://cors-demo.glitch.me/allow-cors"),
  fetch("/"),
  fetch("."),
  fetch(""),
  fetch("https://enable-cors.org"),
  fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
  fetch("https://api.github.com"),
  fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
  processing_promises.push(fetch_promise.then(response => {
    // Process response.  In this example, that means just
    // print it.
    console.log("got a response: ",response);
  }));
}
await Promise.all(processing_promises);

切换到输出更清晰、更确定的示例:

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
  processing_promises.push(sleep_promise.then(result => {
     console.log("promise resolved: ",result);
  }));
}
await Promise.all(processing_promises);

输出符合预期:

15:54:16.331 promise resolved:  slept 1000
15:54:17.331 promise resolved:  slept 2000
15:54:18.331 promise resolved:  slept 3000
15:54:19.332 promise resolved:  slept 4000
15:54:20.331 promise resolved:  slept 5000

我的问题是:假设我想要或需要, 将上述处理表达为 "async for..of" 循环,而不是“then”回调; 所以承诺结果需要以异步的形式出现 可迭代的。我将如何转换承诺数组 到这样的异步迭代?我要的是异步 生成器函数 AwaitAsTheyCome(),将承诺列表作为输入, 随着承诺的解决,它会一个一个地产生结果。 然后我会调用该函数并进行处理,如下所示:

for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

它应该给出与上面相同的输出(具有相同的时间)。

以下尝试的解决方案显然行不通,但它可以让您了解我所期望的是多么简单和简短:

async function* AwaitAsTheyCome(promises) {
  for (const promise of promises) {
    promise.then(response => {
      yield response;  // WRONG
      // I want to yield it from AwaitAsTheyCome,
      // not from the current arrow function!
    });
  }
}

以下解决方案确实有效,但它比我预期的要为此编写的代码多。

async function* AwaitAsTheyCome(promises) {
  // Make a list of notifier promises and
  // functions that resolve those promises,
  // one for each of the original promises.
  const notifier_promises = [];
  const notifier_resolves = [];
  for (const promise of promises) {
    notifier_promises.push(
        new Promise(resolve=>notifier_resolves.push(resolve)));
  }

  const responses = [];
  for (const promise of promises) {
    promise.then(response => {
      responses.push(response);
      // send one notification (i.e. resolve the next notifier promise)
      notifier_resolves.shift()();
    });
  }

  for (const promise of promises) {
    // wait for one notification
    // (i.e. wait for the next notifier promise to be resolved).
    await notifier_promises.shift();
    // yield the corresponding response
    yield responses.shift();
  }
}

// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
  sleep(3000).then(()=>"slept 3000"),
  sleep(1000).then(()=>"slept 1000"),
  sleep(5000).then(()=>"slept 5000"),
  sleep(4000).then(()=>"slept 4000"),
  sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

有没有更简单的方法来实现异步生成器函数AwaitAsTheyCome?

(我尝试用上面的代码制作一个 stacksnippet,但没有成功——我怀疑这是因为代码片段系统不理解新的异步生成器 and/or for await..of语法)

这会做同样的事情,无论您在循环中放置什么代码来处理 yield 结果都会进入 onfulfilled 回调。 Promise.all() 等待所有承诺像您的循环一样完成。

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];

const onfulfilled = result => console.log("promise resolved: ",result);
sleep_promises.forEach( p => p.then(onfulfilled) )
await Promise.all(sleep_promises)

您可以通过

稍微简化代码
  • 在输入数组上只使用一个循环(尽管这可能会造成混淆)
  • 不使用 responses 数组,只是履行承诺
  • 不在 promises 数组上使用 .shift() 而是简单地循环它
async function* raceAll(input) {
  const promises = [];
  const resolvers = [];
  for (const p of input) {
    promises.push(new Promise(resolve=> {
      resolvers.push(resolve);
    }));
    p.then(result => {
      resolvers.shift()(result);
    });
  }

  for (const promise of promises) {
    yield promise;
  }
}

如果您不喜欢所需的代码量,我建议您将此实现的队列分解为一个单独的模块。例如,代码可以变得像

一样简单
function raceAll(promises) {
  const queue = new AsyncBlockingQueue();
  for (const p of promises) {
    p.then(result => {
      queue.enqueue(result); 
    });
  }
  return queue[Symbol.asyncIterator]();
}

然而,这两种实现都遗漏了一个关键问题:错误处理。如果这些承诺中的任何一个被拒绝,您将收到一个未处理的拒绝错误,这可能会使您的流程崩溃。要真正让异步迭代器拒绝下一个承诺,以便 try/catch 围绕 for await…of 循环可以处理它,您需要执行类似

async function* raceAll(input) {
  const promises = [];
  const resolvers = [];
  for (const p of input) {
    promises.push(new Promise(resolve => {
      resolvers.push(resolve);
    }));
    p.finally(() => {
      resolvers.shift()(p);
    });
    // works equivalent to:
    // p.then(result => {
    //   resolvers.shift()(result);
    // }, error => {
    //   resolvers.shift()(Promise.reject(error));
    // });
  }

  for (const promise of promises) {
    yield promise;
  }
}

用被拒绝的 promise 解决 promise 就可以了,这样我们仍然只需要一个 resolver 函数队列,而不是同时包含 resolvereject 函数的队列。