异步生成器在解决时产生承诺结果
async generator yielding promise results as they are resolved
假设我想同时获取 10 个 url,
并处理收到的回复
(可能与订单顺序不同
它们出现在原始列表中)。
忽略拒绝的可能性,一种方法是简单地附加一个“then”回调
每个承诺,然后等待他们全部完成
使用 Promise.all()
.
const fetch_promises = [
fetch("https://cors-demo.glitch.me/allow-cors"),
fetch("/"),
fetch("."),
fetch(""),
fetch("https://enable-cors.org"),
fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
fetch("https://api.github.com"),
fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
processing_promises.push(fetch_promise.then(response => {
// Process response. In this example, that means just
// print it.
console.log("got a response: ",response);
}));
}
await Promise.all(processing_promises);
切换到输出更清晰、更确定的示例:
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
processing_promises.push(sleep_promise.then(result => {
console.log("promise resolved: ",result);
}));
}
await Promise.all(processing_promises);
输出符合预期:
15:54:16.331 promise resolved: slept 1000
15:54:17.331 promise resolved: slept 2000
15:54:18.331 promise resolved: slept 3000
15:54:19.332 promise resolved: slept 4000
15:54:20.331 promise resolved: slept 5000
我的问题是:假设我想要或需要,
将上述处理表达为 "async for..of" 循环,而不是“then”回调;
所以承诺结果需要以异步的形式出现
可迭代的。我将如何转换承诺数组
到这样的异步迭代?我要的是异步
生成器函数 AwaitAsTheyCome(),将承诺列表作为输入,
随着承诺的解决,它会一个一个地产生结果。
然后我会调用该函数并进行处理,如下所示:
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
它应该给出与上面相同的输出(具有相同的时间)。
以下尝试的解决方案显然行不通,但它可以让您了解我所期望的是多么简单和简短:
async function* AwaitAsTheyCome(promises) {
for (const promise of promises) {
promise.then(response => {
yield response; // WRONG
// I want to yield it from AwaitAsTheyCome,
// not from the current arrow function!
});
}
}
以下解决方案确实有效,但它比我预期的要为此编写的代码多。
async function* AwaitAsTheyCome(promises) {
// Make a list of notifier promises and
// functions that resolve those promises,
// one for each of the original promises.
const notifier_promises = [];
const notifier_resolves = [];
for (const promise of promises) {
notifier_promises.push(
new Promise(resolve=>notifier_resolves.push(resolve)));
}
const responses = [];
for (const promise of promises) {
promise.then(response => {
responses.push(response);
// send one notification (i.e. resolve the next notifier promise)
notifier_resolves.shift()();
});
}
for (const promise of promises) {
// wait for one notification
// (i.e. wait for the next notifier promise to be resolved).
await notifier_promises.shift();
// yield the corresponding response
yield responses.shift();
}
}
// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
有没有更简单的方法来实现异步生成器函数AwaitAsTheyCome?
(我尝试用上面的代码制作一个 stacksnippet,但没有成功——我怀疑这是因为代码片段系统不理解新的异步生成器 and/or for await..of
语法)
这会做同样的事情,无论您在循环中放置什么代码来处理 yield 结果都会进入 onfulfilled 回调。 Promise.all()
等待所有承诺像您的循环一样完成。
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
const onfulfilled = result => console.log("promise resolved: ",result);
sleep_promises.forEach( p => p.then(onfulfilled) )
await Promise.all(sleep_promises)
您可以通过
稍微简化代码
- 在输入数组上只使用一个循环(尽管这可能会造成混淆)
- 不使用
responses
数组,只是履行承诺
- 不在 promises 数组上使用
.shift()
而是简单地循环它
async function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve=> {
resolvers.push(resolve);
}));
p.then(result => {
resolvers.shift()(result);
});
}
for (const promise of promises) {
yield promise;
}
}
如果您不喜欢所需的代码量,我建议您将此实现的队列分解为一个单独的模块。例如,代码可以变得像
一样简单
function raceAll(promises) {
const queue = new AsyncBlockingQueue();
for (const p of promises) {
p.then(result => {
queue.enqueue(result);
});
}
return queue[Symbol.asyncIterator]();
}
然而,这两种实现都遗漏了一个关键问题:错误处理。如果这些承诺中的任何一个被拒绝,您将收到一个未处理的拒绝错误,这可能会使您的流程崩溃。要真正让异步迭代器拒绝下一个承诺,以便 try
/catch
围绕 for await…of
循环可以处理它,您需要执行类似
async function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve => {
resolvers.push(resolve);
}));
p.finally(() => {
resolvers.shift()(p);
});
// works equivalent to:
// p.then(result => {
// resolvers.shift()(result);
// }, error => {
// resolvers.shift()(Promise.reject(error));
// });
}
for (const promise of promises) {
yield promise;
}
}
用被拒绝的 promise 解决 promise 就可以了,这样我们仍然只需要一个 resolver
函数队列,而不是同时包含 resolve
和 reject
函数的队列。
假设我想同时获取 10 个 url,
并处理收到的回复
(可能与订单顺序不同
它们出现在原始列表中)。
忽略拒绝的可能性,一种方法是简单地附加一个“then”回调
每个承诺,然后等待他们全部完成
使用 Promise.all()
.
const fetch_promises = [
fetch("https://cors-demo.glitch.me/allow-cors"),
fetch("/"),
fetch("."),
fetch(""),
fetch("https://enable-cors.org"),
fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
fetch("https://api.github.com"),
fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
processing_promises.push(fetch_promise.then(response => {
// Process response. In this example, that means just
// print it.
console.log("got a response: ",response);
}));
}
await Promise.all(processing_promises);
切换到输出更清晰、更确定的示例:
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
processing_promises.push(sleep_promise.then(result => {
console.log("promise resolved: ",result);
}));
}
await Promise.all(processing_promises);
输出符合预期:
15:54:16.331 promise resolved: slept 1000
15:54:17.331 promise resolved: slept 2000
15:54:18.331 promise resolved: slept 3000
15:54:19.332 promise resolved: slept 4000
15:54:20.331 promise resolved: slept 5000
我的问题是:假设我想要或需要, 将上述处理表达为 "async for..of" 循环,而不是“then”回调; 所以承诺结果需要以异步的形式出现 可迭代的。我将如何转换承诺数组 到这样的异步迭代?我要的是异步 生成器函数 AwaitAsTheyCome(),将承诺列表作为输入, 随着承诺的解决,它会一个一个地产生结果。 然后我会调用该函数并进行处理,如下所示:
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
它应该给出与上面相同的输出(具有相同的时间)。
以下尝试的解决方案显然行不通,但它可以让您了解我所期望的是多么简单和简短:
async function* AwaitAsTheyCome(promises) {
for (const promise of promises) {
promise.then(response => {
yield response; // WRONG
// I want to yield it from AwaitAsTheyCome,
// not from the current arrow function!
});
}
}
以下解决方案确实有效,但它比我预期的要为此编写的代码多。
async function* AwaitAsTheyCome(promises) {
// Make a list of notifier promises and
// functions that resolve those promises,
// one for each of the original promises.
const notifier_promises = [];
const notifier_resolves = [];
for (const promise of promises) {
notifier_promises.push(
new Promise(resolve=>notifier_resolves.push(resolve)));
}
const responses = [];
for (const promise of promises) {
promise.then(response => {
responses.push(response);
// send one notification (i.e. resolve the next notifier promise)
notifier_resolves.shift()();
});
}
for (const promise of promises) {
// wait for one notification
// (i.e. wait for the next notifier promise to be resolved).
await notifier_promises.shift();
// yield the corresponding response
yield responses.shift();
}
}
// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
有没有更简单的方法来实现异步生成器函数AwaitAsTheyCome?
(我尝试用上面的代码制作一个 stacksnippet,但没有成功——我怀疑这是因为代码片段系统不理解新的异步生成器 and/or for await..of
语法)
这会做同样的事情,无论您在循环中放置什么代码来处理 yield 结果都会进入 onfulfilled 回调。 Promise.all()
等待所有承诺像您的循环一样完成。
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
const onfulfilled = result => console.log("promise resolved: ",result);
sleep_promises.forEach( p => p.then(onfulfilled) )
await Promise.all(sleep_promises)
您可以通过
稍微简化代码- 在输入数组上只使用一个循环(尽管这可能会造成混淆)
- 不使用
responses
数组,只是履行承诺 - 不在 promises 数组上使用
.shift()
而是简单地循环它
async function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve=> {
resolvers.push(resolve);
}));
p.then(result => {
resolvers.shift()(result);
});
}
for (const promise of promises) {
yield promise;
}
}
如果您不喜欢所需的代码量,我建议您将此实现的队列分解为一个单独的模块。例如
function raceAll(promises) {
const queue = new AsyncBlockingQueue();
for (const p of promises) {
p.then(result => {
queue.enqueue(result);
});
}
return queue[Symbol.asyncIterator]();
}
然而,这两种实现都遗漏了一个关键问题:错误处理。如果这些承诺中的任何一个被拒绝,您将收到一个未处理的拒绝错误,这可能会使您的流程崩溃。要真正让异步迭代器拒绝下一个承诺,以便 try
/catch
围绕 for await…of
循环可以处理它,您需要执行类似
async function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve => {
resolvers.push(resolve);
}));
p.finally(() => {
resolvers.shift()(p);
});
// works equivalent to:
// p.then(result => {
// resolvers.shift()(result);
// }, error => {
// resolvers.shift()(Promise.reject(error));
// });
}
for (const promise of promises) {
yield promise;
}
}
用被拒绝的 promise 解决 promise 就可以了,这样我们仍然只需要一个 resolver
函数队列,而不是同时包含 resolve
和 reject
函数的队列。