异步生成器:产生被拒绝的承诺

Async Generator: Yielding a rejected promise

我一直在玩弄异步生成器,试图制作一个 "promise ordering" 生成器,它接受一系列承诺,并按照它们解决或拒绝的顺序一个接一个地产生承诺。所以像:

async function* orderProms(prom_arr) {

    // Make a copy so the splices don't mess it up.
    const proms = [...prom_arr];

    while (proms.length) {
        // Tag each promise with it's index, so that we can remove it for the next loop.
        const {prom, index} = await Promise.race(proms.map((prom, index) => prom.then(
            () => ({prom, index}),
            () => ({prom, index})
        )));

        proms.splice(index, 1);
        yield prom;
    }
}

有了像这样使用这个生成器的想法:

const resAfter = (val, delay) => new Promise(res => setTimeout(() => res(val), delay));
const rejAfter = (val, delay) => new Promise((_, rej) => setTimeout(() => rej(val), delay));

const promises = [
    resAfter("Third", 3000),
    resAfter("First", 1000),
    rejAfter("Second", 2000), // NOTE: this one rejects!
];

(async () => {

    let ordered = orderProms(promises);

    let done = false;
    for (let next_promise = ordered.next(); !done; next_promise = ordered.next()) {
        const next = await next_promise
            .catch(err => ({done: false, value: `Caught error: ${err}`}));

        done = next.done;
        if (!done) console.log(next.value);
    }
})()

但是,我注意到这将达到第二个承诺,然后生成器将停止。好像是因为 "second" promise 被拒绝了。当 prom 被拒绝时,在生成器中调用 yield prom 将在生成器 中创建异常

但这就是我困惑的根源。我不想在这里创建异常,我只想将被拒绝的承诺作为迭代器结果的 value 产生。我不希望它被打开。这几乎就像被视为 yield await prom;,但如您所见,没有 await 调用。

这是怎么回事,我怎样才能简单地从这个生成器中按原样产生一个被拒绝的承诺。


这是可运行片段中的上述代码:

async function* orderProms(prom_arr) {

    // Make a copy so the splices don't mess it up.
    const proms = [...prom_arr];

    while (proms.length) {
        // Tag each promise with it's index, so that we can remove it for the next loop.
        const {prom, index} = await Promise.race(proms.map((prom, index) => prom.then(
            () => ({prom, index}),
            () => ({prom, index})
        )));

        proms.splice(index, 1);
        yield prom;
    }
}

const resAfter = (val, delay) => new Promise(res => setTimeout(() => res(val), delay));
const rejAfter = (val, delay) => new Promise((_, rej) => setTimeout(() => rej(val), delay));

const promises = [
    resAfter("Third", 3000),
    resAfter("First", 1000),
    rejAfter("Second", 2000), // NOTE: this one rejects!
];

(async () => {

    let ordered = orderProms(promises);

    let done = false;
    for (let next_promise = ordered.next(); !done; next_promise = ordered.next()) {
        const next = await next_promise
            .catch(err => ({done: false, value: `Caught error: ${err}`}));

        done = next.done;
        if (!done) console.log(next.value);
    }
})()

It's almost like this is being treated as yield await prom. What is going on here?

是异步生成器的行为方式。

how can I simply yield a rejected promise as-is from this generator.

你不能。请注意,异步迭代器预计将由

使用
try {
    for await (const value of orderProms(promises)) {
        console.log(value);
    }
} catch(err) {
    console.error('Caught error: ', err);
}

语法中没有针对个别错误处理的便利。当出现异常时,循环停止,生成器完成。点.

那你能做什么?我看到三个选择:

  • 保持原样并将早期失败视为一项功能(类似于 Promise.all
  • 处理错误(在 orderProms 中或在将承诺传递给它之前)并产生承诺状态和价值的元组

    for await (const value of orderProms(promises.map(prom =>
        prom.catch(err => `Caught error: ${err}`)
    ))) {
        console.log(value);
    }
    
  • 使用一个普通的(非async)生成器,您可以从中手动产生一个又一个承诺,以便能够以您想要的方式使用它

您可以让 promises 解析为类似于您从 Promise.allSettled:

得到的东西

async function* orderProms(prom_arr) {
    // Make a copy so the splices don't mess it up.
    const proms = new Set(prom_arr.map((prom, index) => ({prom, index})));
    while (proms.size) {
        const settled = await Promise.race(Array.from(proms, obj => obj.prom.then(
            value => Object.assign(obj, { value, status: "fulfilled" }),
            error => Object.assign(obj, { error, status: "rejected" }),
        )));
        proms.delete(settled);
        let { prom, ...rest } = settled;
        yield rest;
    }
}

const resAfter = (val, delay) => new Promise(res => setTimeout(() => res(val), delay));
const rejAfter = (val, delay) => new Promise((_, rej) => setTimeout(() => rej(val), delay));

const promises = [
    resAfter("Third", 3000),
    resAfter("First", 1000),
    rejAfter("Second", 2000), // NOTE: this one rejects!
];

(async () => {
    for await (let result of orderProms(promises)) {
        console.log(JSON.stringify(result));
    }
})().catch(err => console.log(err.message));

我不能说接受的答案是错误的,但也不是很正确。尤其是

When there's an exception, the loop stops, the generator is done. Point.

部分有问题。

根据你的问题,虽然现代 JS 允许我们对这个问题采取一些优雅的方法,但按照你的要求,我们仍然可以让它工作,即使我认为它不是......那么好。

第一部分 - 对原始问题的回答

我不会详细介绍,只是注意生成器函数 orderPromsfinally 的用法,在从内部抛出异常后,事情仍然发生(生成器未完成) .所以.. 一种方法可能是;

async function* orderProms(prom_arr) {

    // Make a copy so the splices don't mess it up.
    var proms = [...prom_arr];

        // Tag each promise with it's index, so that we can remove it for the next loop.
     try {
      while (proms.length) {
            var {prom, index} = await Promise.race(proms.map((prom, index) => prom.then(
                () => ({prom, index}),
                () => ({prom, index})
            )));
            
            proms.splice(index, 1);
            yield prom;
          }
    } finally {
        proms.length && (ordered = orderProms(proms));
      }
}

var resAfter = (val, delay) => new Promise(res => setTimeout(() => res(val), delay)),
    rejAfter = (val, delay) => new Promise((_, rej) => setTimeout(() => rej(val), delay)),
    promises = [ resAfter("Third", 3000)
               , resAfter("First", 1000)
               , rejAfter("Second", 2000) // NOTE: this one rejects!
               ],
    ordered  = orderProms(promises);

async function endPoint() {
    try {
      for await (var value of ordered) {
        console.log(value)
      }
    }
    catch(e){
      console.log(`caught rejection ${e} at endpoint`);
      endPoint();
    }
}

endPoint();

第二部分 - 问题的优雅解决方案

现在想象一下.. 如果我们有一个数组,我们可以像处理普通数组一样用承诺填充它,并且其中的承诺会根据它们的解决/拒绝时间自动排序。

首先让我们扩展 Array 类型并赋予它特殊的异步能力。下面的代码定义了 SortedAsyncArray 类型并且只是一个框架。它没有经过全面测试,但应该足以给出一个想法。再次注意 finally 部分,因为它仅在 yield 由于异常或耗尽(生成器完成情况)而挂起时才执行。

class SortedPromisesArray extends Array {
  constructor(...args){
    super(...args);
  };
  async *[Symbol.asyncIterator]() {
    try {
      while(this.length){
        var {v,i} = await Promise.race(this.map((p,i) => p.then(v => ({v,i}))));
        this.splice(i,1);
        yield v;
      }
    } finally {
        this.length && this.splice(i,1);
    };
  };
};

那我们应该如何使用这个异步数组呢?我想出的方法如下。

var promise  = (val, delay, resolves) => new Promise((v,x) => setTimeout(() => resolves ? v(val) : x(val), delay)),
    promises = [ promise("Third",  3000, true)
               , promise("First",  1000, true)
               , promise("Second", 2000, false) // NOTE: this one rejects!
               ],
    sortedPS = new SortedPromisesArray(...promises);

async function sink() {
  try {
    for await (let value of sortedPS){
      console.log(`Got: ${value}`);
    }
  } catch(err) {
    console.log(`caught at endpoint --> exception ${err}`);
    sink();
  };
};
sink();

第三部分正确的方法

我们在 第二部分 中看到的内容非常漂亮和优雅,但它在滥用 Promise.race()。如果这不是一个异步代码,那将是一种罪过。好吧……在我的书中,这仍然是一种罪过。你为什么要多次竞速最慢的承诺 .length

现在我们将尝试通过深入研究

来正确解决这种愚蠢的问题
  • Private Class Fields 隐藏那些我们不想让任何人弄乱的变量。
  • 我们甚至会检查提供的值是否为真正的承诺,并丢弃那些不是承诺的值。尽管在检查之后,如果您想使用 Promise.resolve(notAPromise) 语句,您可以向他们承诺。
  • 删除 async *[asyncIterator]() 部分中的 finally 块。

我们神奇排序的异步数组现在变成了

class SortedPromisesArray extends Array {
  #RESOLVE;
  #REJECT;
  #COUNT;
  constructor(...args){
    super(...args.filter(p => Object(p).constructor === Promise));
    this.#COUNT = this.length;
    this.forEach(p => p.then(v => this.#RESOLVE(v), e => this.#REJECT(e)));
  };
  async *[Symbol.asyncIterator]() {
    while(this.#COUNT--) {
      yield new Promise((resolve,reject) => ( this.#RESOLVE = resolve
                                            , this.#REJECT  = reject
                                            ));
    };
  };
};

sink() 功能仅与 第二部分 中的功能保持一致。

var promise  = (val, delay, resolves) => new Promise((v,x) => setTimeout(() => resolves ? v(val) : x(val), delay)),
    promises = [ promise("Third", 3000, true)
               , promise("First", 1000, true)
               , promise("Second", 2000, false) // NOTE: this one rejects!
               ],
    sortedPS = new SPA(...promises);

async function sink() {
  try {
    for await (let value of sortedPS){
      console.log(`Got: ${value}`);
    }
  } catch(err) {
    console.log(`caught at endpoint --> exception ${err}`);
    sink();
  }
}

sink();

再次...这不是生产代码。它在这里仅用于演示目的,但它是向现代 JS/TS.

中的良好设计模式迈出的一步