如何映射异步生成器?

How to map over async generators?

假设我们有一个异步生成器:

exports.asyncGen = async function* (items) {
  for (const item of items) {
    const result = await someAsyncFunc(item)
    yield result;
  }
}

可以映射这个生成器吗?基本上我想这样做:

const { asyncGen } = require('./asyncGen.js')

exports.process = async function (items) {
  return asyncGen(items).map(item => {
    //... do something
  })
}

截至目前.map无法识别异步迭代器。

另一种方法是使用 for await ... of ,但这远不如 .map

优雅

iterator methods proposal that would provide this method is still at stage 2 only. You can use some polyfill,或者编写您自己的 map 辅助函数:

async function* map(asyncIterable, callback) {
    let i = 0;
    for await (const val of asyncIterable)
        yield callback(val, i++);
}

exports.process = function(items) {
    return map(asyncGen(items), item => {
       //... do something
    });
};

The alternative is to use for await ... of, but that's nowhere near elegant as with .map

对于优雅高效的解决方案,这里是使用 iter-ops 库的解决方案:

import {pipe, map} from 'iter-ops';

const i = pipe(
    asyncGen(), // your async generator result
    map(value => /*map logic*/)
); //=> AsyncIterable
  • 它很优雅,因为语法干净、简单,适用于任何可迭代对象或迭代器,而不仅仅是异步生成器。
  • 它更灵活且可重用,因为您可以将许多其他运算符添加到同一管道。

因为它产生了一个标准 JavaScript AsyncIterable,你可以这样做:

for await(const a of i) {
    console.log(a); //=> print values
}

P.S。我是 iter-ops.

的作者

TL;DR - 如果映射函数是异步的:

要使asyncIter在生成下一个值之前不等待每个映射,请执行

async function asyncIterMap(asyncIter, asyncFunc) {
    const promises = [];
    for await (const value of asyncIter) {
        promises.push(asyncFunc(value))
    }
    return await Promise.all(promises)
}

// example - how to use:
const results = await asyncIterMap(myAsyncIter(), async (str) => {
    await sleep(3000)
    return str.toUpperCase()
});

更多演示:

// dummy asyncIter for demonstration

const sleep = (ms) => new Promise(res => setTimeout(res, ms))

async function* myAsyncIter() {
    await sleep(1000)
    yield 'first thing'
    await sleep(1000)
    yield 'second thing'
    await sleep(1000)
    yield 'third thing'
}

然后

// THIS IS BAD! our asyncIter waits for each mapping.

for await (const thing of myAsyncIter()) {
    console.log('starting with', thing)
    await sleep(3000)
    console.log('finished with', thing)
}

// total run time: ~12 seconds

更好的版本:

// this is better.

const promises = [];

for await (const thing of myAsyncIter()) {
    const task = async () => {
        console.log('starting with', thing)
        await sleep(3000)
        console.log('finished with', thing)
    };
    promises.push(task())
}

await Promise.all(promises)

// total run time: ~6 seconds