在 forEach 循环中使用 async/await

Using async/await with a forEach loop

forEach 循环中使用 async/await 有什么问题吗?我正在尝试遍历文件数组并 await 每个文件的内容。

import fs from 'fs-promise'

async function printFiles () {
  const files = await getFilePaths() // Assume this works fine

  files.forEach(async (file) => {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
  })
}

printFiles()

这段代码确实有效,但是这会不会有问题?有人告诉我你不应该在像这样的高阶函数中使用 async/await,所以我只想问问这是否有任何问题。

确定代码确实有效,但我很确定它没有按照您的预期进行。它只是触发多个异步调用,但 printFiles 函数会在之后立即执行 return。

按顺序阅读

如果要顺序读取文件,确实不能使用forEach。只需使用现代 for … of 循环,其中 await 将按预期工作:

async function printFiles () {
  const files = await getFilePaths();

  for (const file of files) {
    const contents = await fs.readFile(file, 'utf8');
    console.log(contents);
  }
}

并行阅读

如果要并行读取文件,确实不能使用forEach。每个 async 回调函数调用都会 return 一个承诺,但您将它们扔掉而不是等待它们。只需使用 map 即可,您可以等待使用 Promise.all:

获得的承诺数组
async function printFiles () {
  const files = await getFilePaths();

  await Promise.all(files.map(async (file) => {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
  }));
}

npm 上的 p-iteration 模块实现了数组迭代方法,因此它们可以非常直接地用于 async/await。

您的案例示例:

const { forEach } = require('p-iteration');
const fs = require('fs-promise');

(async function printFiles () {
  const files = await getFilePaths();

  await forEach(files, async (file) => {
    const contents = await fs.readFile(file, 'utf8');
    console.log(contents);
  });
})();

上面的两种解决方案都有效,但是,Antonio 的代码更少,下面是它如何帮助我从数据库中解析数据,从几个不同的子引用中解析数据,然后将它们全部推入一个数组并将其解析为完成后的承诺:

Promise.all(PacksList.map((pack)=>{
    return fireBaseRef.child(pack.folderPath).once('value',(snap)=>{
        snap.forEach( childSnap => {
            const file = childSnap.val()
            file.id = childSnap.key;
            allItems.push( file )
        })
    })
})).then(()=>store.dispatch( actions.allMockupItems(allItems)))

在一个文件中弹出几个方法非常轻松,这些方法将按序列化顺序处理异步数据并为您的代码提供更传统的风格。例如:

module.exports = function () {
  var self = this;

  this.each = async (items, fn) => {
    if (items && items.length) {
      await Promise.all(
        items.map(async (item) => {
          await fn(item);
        }));
    }
  };

  this.reduce = async (items, fn, initialValue) => {
    await self.each(
      items, async (item) => {
        initialValue = await fn(initialValue, item);
      });
    return initialValue;
  };
};

现在,假设它保存在“./myAsync.js”,您可以在相邻文件中执行类似于以下的操作:

...
/* your server setup here */
...
var MyAsync = require('./myAsync');
var Cat = require('./models/Cat');
var Doje = require('./models/Doje');
var example = async () => {
  var myAsync = new MyAsync();
  var doje = await Doje.findOne({ name: 'Doje', noises: [] }).save();
  var cleanParams = [];

  // FOR EACH EXAMPLE
  await myAsync.each(['bork', 'concern', 'heck'], 
    async (elem) => {
      if (elem !== 'heck') {
        await doje.update({ $push: { 'noises': elem }});
      }
    });

  var cat = await Cat.findOne({ name: 'Nyan' });

  // REDUCE EXAMPLE
  var friendsOfNyanCat = await myAsync.reduce(cat.friends,
    async (catArray, friendId) => {
      var friend = await Friend.findById(friendId);
      if (friend.name !== 'Long cat') {
        catArray.push(friend.name);
      }
    }, []);
  // Assuming Long Cat was a friend of Nyan Cat...
  assert(friendsOfNyanCat.length === (cat.friends.length - 1));
}

一个重要的警告是:await + for .. of方法和forEach + async方法实际上有不同的效果。

在真正的 for 循环中使用 await 将确保所有异步调用都一一执行。 forEach + async 方式将同时触发所有承诺,速度更快但有时会不知所措(如果您进行一些数据库查询或访问一些具有容量限制的 Web 服务 和不想一次触发 100,000 个调用)。

如果您不使用 async/await 并且想要确保文件被读取 一个接一个 ,您也可以使用 reduce + promise(不太优雅)。

files.reduce((lastPromise, file) => 
 lastPromise.then(() => 
   fs.readFile(file, 'utf8')
 ), Promise.resolve()
)

或者您可以创建一个 forEachAsync 来提供帮助,但基本上使用相同的 for 循环底层。

Array.prototype.forEachAsync = async function(cb){
    for(let x of this){
        await cb(x);
    }
}

我会使用经过充分测试的(每周数百万次下载)pify and async modules. If you are unfamiliar with the async module, I highly recommend you check out its docs。我见过多个开发人员浪费时间重新创建它的方法,或者更糟的是,当高阶异步方法可以简化代码时,制作难以维护的异步代码。

const async = require('async')
const fs = require('fs-promise')
const pify = require('pify')

async function getFilePaths() {
    return Promise.resolve([
        './package.json',
        './package-lock.json',
    ]);
}

async function printFiles () {
  const files = await getFilePaths()

  await pify(async.eachSeries)(files, async (file) => {  // <-- run in series
  // await pify(async.each)(files, async (file) => {  // <-- run in parallel
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
  })
  console.log('HAMBONE')
}

printFiles().then(() => {
    console.log('HAMBUNNY')
})
// ORDER OF LOGS:
// package.json contents
// package-lock.json contents
// HAMBONE
// HAMBUNNY
```

除了,我想提供第三种选择。它与@Bergi 的第二个示例非常相似,但不是单独等待每个 readFile,而是创建一个承诺数组,每个承诺都在最后等待。

import fs from 'fs-promise';
async function printFiles () {
  const files = await getFilePaths();

  const promises = files.map((file) => fs.readFile(file, 'utf8'))

  const contents = await Promise.all(promises)

  contents.forEach(console.log);
}

请注意,传递给 .map() 的函数不需要是 async,因为 fs.readFile returns 无论如何都是一个 Promise 对象。因此promises是一个Promise对象数组,可以发送到Promise.all().

在@Bergi 的回答中,控制台可能会按照读取顺序记录文件内容。例如,如果一个非常小的文件在一个非常大的文件之前完成读取,它将首先被记录,即使小文件在 files 数组中的大文件之后。但是,在我上面的方法中,您可以保证控制台将按照与提供的数组相同的顺序记录文件。

使用 Task、futurize 和一个可遍历的列表,你可以简单地做

async function printFiles() {
  const files = await getFiles();

  List(files).traverse( Task.of, f => readFile( f, 'utf-8'))
    .fork( console.error, console.log)
}

这是您的设置方式

import fs from 'fs';
import { futurize } from 'futurize';
import Task from 'data.task';
import { List } from 'immutable-ext';

const future = futurizeP(Task)
const readFile = future(fs.readFile)

构建所需代码的另一种方法是

const printFiles = files => 
  List(files).traverse( Task.of, fn => readFile( fn, 'utf-8'))
    .fork( console.error, console.log)

或者更注重功能

// 90% of encodings are utf-8, making that use case super easy is prudent

// handy-library.js
export const readFile = f =>
  future(fs.readFile)( f, 'utf-8' )

export const arrayToTaskList = list => taskFn => 
  List(files).traverse( Task.of, taskFn ) 

export const readFiles = files =>
  arrayToTaskList( files, readFile )

export const printFiles = files => 
  readFiles(files).fork( console.error, console.log)

然后从parent函数

async function main() {
  /* awesome code with side-effects before */
  printFiles( await getFiles() );
  /* awesome code with side-effects after */
}

如果你真的想要更灵活的编码,你可以这样做(为了好玩,我使用建议的 Pipe Forward operator

import { curry, flip } from 'ramda'

export const readFile = fs.readFile 
  |> future,
  |> curry,
  |> flip

export const readFileUtf8 = readFile('utf-8')

PS - 我没有在控制台上尝试过这段代码,可能有一些拼写错误...... "straight freestyle, off the top of the dome!" 正如 90 年代的孩子们所说的那样。 :-p

这里有一些 forEachAsync 原型。请注意,您需要 await 他们:

Array.prototype.forEachAsync = async function (fn) {
    for (let t of this) { await fn(t) }
}

Array.prototype.forEachAsyncParallel = async function (fn) {
    await Promise.all(this.map(fn));
}

注意虽然您可以将其包含在您自己的代码中,但您不应将其包含在您分发给其他人的库中(以避免污染他们的全局变量)。

而不是 Promise.allArray.prototype.map 结合使用(不保证 Promise 的解析顺序),我使用 Array.prototype.reduce,从已解决 Promise:

async function printFiles () {
  const files = await getFilePaths();

  await files.reduce(async (promise, file) => {
    // This line will wait for the last async function to finish.
    // The first iteration uses an already resolved Promise
    // so, it will immediately continue.
    await promise;
    const contents = await fs.readFile(file, 'utf8');
    console.log(contents);
  }, Promise.resolve());
}

使用 ES2018,您可以大大简化以上所有问题的答案:

async function printFiles () {
  const files = await getFilePaths()

  for await (const contents of files.map(file => fs.readFile(file, 'utf8'))) {
    console.log(contents)
  }
}

查看规范:proposal-async-iteration


2018-09-10:这个答案最近受到了很多关注,关于异步迭代的更多信息请参见Axel Rauschmayer's blog post

类似于 Antonio Val 的 , an alternative npm module is async-af:

const AsyncAF = require('async-af');
const fs = require('fs-promise');

function printFiles() {
  // since AsyncAF accepts promises or non-promises, there's no need to await here
  const files = getFilePaths();

  AsyncAF(files).forEach(async file => {
    const contents = await fs.readFile(file, 'utf8');
    console.log(contents);
  });
}

printFiles();

或者,async-af 有一个记录承诺结果的静态方法 (log/logAF):

const AsyncAF = require('async-af');
const fs = require('fs-promise');

function printFiles() {
  const files = getFilePaths();

  AsyncAF(files).forEach(file => {
    AsyncAF.log(fs.readFile(file, 'utf8'));
  });
}

printFiles();

但是,该库的主要优点是您可以链接异步方法来执行以下操作:

const aaf = require('async-af');
const fs = require('fs-promise');

const printFiles = () => aaf(getFilePaths())
  .map(file => fs.readFile(file, 'utf8'))
  .forEach(file => aaf.log(file));

printFiles();

async-af

目前 Array.forEach 原型 属性 不支持异步操作,但我们可以创建自己的 poly-fill 来满足我们的需要。

// Example of asyncForEach Array poly-fill for NodeJs
// file: asyncForEach.js
// Define asynForEach function 
async function asyncForEach(iteratorFunction){
  let indexer = 0
  for(let data of this){
    await iteratorFunction(data, indexer)
    indexer++
  }
}
// Append it as an Array prototype property
Array.prototype.asyncForEach = asyncForEach
module.exports = {Array}

就是这样!您现在可以在这些操作之后定义的任何数组上使用异步 forEach 方法。

让我们测试一下...

// Nodejs style
// file: someOtherFile.js

const readline = require('readline')
Array = require('./asyncForEach').Array
const log = console.log

// Create a stream interface
function createReader(options={prompt: '>'}){
  return readline.createInterface({
    input: process.stdin
    ,output: process.stdout
    ,prompt: options.prompt !== undefined ? options.prompt : '>'
  })
}
// Create a cli stream reader
async function getUserIn(question, options={prompt:'>'}){
  log(question)
  let reader = createReader(options)
  return new Promise((res)=>{
    reader.on('line', (answer)=>{
      process.stdout.cursorTo(0, 0)
      process.stdout.clearScreenDown()
      reader.close()
      res(answer)
    })
  })
}

let questions = [
  `What's your name`
  ,`What's your favorite programming language`
  ,`What's your favorite async function`
]
let responses = {}

async function getResponses(){
// Notice we have to prepend await before calling the async Array function
// in order for it to function as expected
  await questions.asyncForEach(async function(question, index){
    let answer = await getUserIn(question)
    responses[question] = answer
  })
}

async function main(){
  await getResponses()
  log(responses)
}
main()
// Should prompt user for an answer to each question and then 
// log each question and answer as an object to the terminal

我们可以对其他一些数组函数做同样的事情,比如 map...

async function asyncMap(iteratorFunction){
  let newMap = []
  let indexer = 0
  for(let data of this){
    newMap[indexer] = await iteratorFunction(data, indexer, this)
    indexer++
  }
  return newMap
}

Array.prototype.asyncMap = asyncMap

...等等:)

一些注意事项:

  • 您的 iteratorFunction 必须是异步函数或 promise
  • Array.prototype.<yourAsyncFunc> = <yourAsyncFunc> 之前创建的任何数组都没有此功能可用

fs 是基于 promise 的时候效果很好。 您可以为此使用 bluebirdfs-extrafs-promise

但是节点的原生fs库的解决方案如下:

const result = await Promise.all(filePaths
    .map( async filePath => {
      const fileContents = await getAssetFromCache(filePath, async function() {

        // 1. Wrap with Promise    
        // 2. Return the result of the Promise
        return await new Promise((res, rej) => {
          fs.readFile(filePath, 'utf8', function(err, data) {
            if (data) {
              res(data);
            }
          });
        });
      });

      return fileContents;
    }));

注: require('fs') 强制将函数作为第三个参数,否则抛出错误:

TypeError [ERR_INVALID_CALLBACK]: Callback must be a function

要查看它是如何出错的,请在方法末尾打印 console.log。

一般情况下可能出错的地方:

  • 任意顺序。
  • printFiles 可以在打印文件之前完成 运行。
  • 表现不佳。

这些并不总是错误的,但在标准用例中经常是错误的。

一般来说,使用 forEach 会得到除最后一个以外的所有结果。它会在不等待函数的情况下调用每个函数,这意味着它告诉所有函数开始然后结束而不等待函数完成。

import fs from 'fs-promise'

async function printFiles () {
  const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'))

  for(const file of files)
    console.log(await file)
}

printFiles()

这是本机 JS 中的一个示例,它将保持顺序,防止函数过早返回并在理论上保持最佳性能。

这将:

  • 启动并行发生的所有文件读取。
  • 通过使用 map 将文件名映射到要等待的承诺来保留顺序。
  • 按照数组定义的顺序等待每个承诺。

使用此解决方案,第一个文件将在可用时立即显示,而无需等待其他文件先可用。

它还将同时加载所有文件,而不是必须等待第一个文件完成才能开始读取第二个文件。

这个版本和原始版本的唯一缺点是,如果一次开始多次读取,那么由于一次可能发生更多错误,因此处理错误会更加困难。

对于一次读取一个文件的版本,然后将在失败时停止,而不会浪费时间尝试读取更多文件。即使使用精心设计的取消系统,也很难避免它在第一个文件上失败,但也已经读取了大部分其他文件。

性能并不总是可以预测的。虽然许多系统使用并行文件读取会更快,但有些系统更喜欢顺序读取。有些是动态的,可能会在负载下发生变化,提供延迟的优化在激烈竞争下并不总能产生良好的吞吐量。

该示例中也没有错误处理。如果某些事情要求它们要么全部成功显示,要么根本不显示,它不会那样做。

建议在每个阶段使用 console.log 和假文件读取解决方案(改为随机延迟)进行深入实验。尽管许多解决方案似乎在简单情况下都具有相同的作用,但它们都有细微差别,需要额外仔细检查才能找出。

使用这个模拟来帮助区分解决方案:

(async () => {
  const start = +new Date();
  const mock = () => {
    return {
      fs: {readFile: file => new Promise((resolve, reject) => {
        // Instead of this just make three files and try each timing arrangement.
        // IE, all same, [100, 200, 300], [300, 200, 100], [100, 300, 200], etc.
        const time = Math.round(100 + Math.random() * 4900);
        console.log(`Read of ${file} started at ${new Date() - start} and will take ${time}ms.`)
        setTimeout(() => {
          // Bonus material here if random reject instead.
          console.log(`Read of ${file} finished, resolving promise at ${new Date() - start}.`);
          resolve(file);
        }, time);
      })},
      console: {log: file => console.log(`Console Log of ${file} finished at ${new Date() - start}.`)},
      getFilePaths: () => ['A', 'B', 'C', 'D', 'E']
    };
  };

  const printFiles = (({fs, console, getFilePaths}) => {
    return async function() {
      const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'));

      for(const file of files)
        console.log(await file);
    };
  })(mock());

  console.log(`Running at ${new Date() - start}`);
  await printFiles();
  console.log(`Finished running at ${new Date() - start}`);
})();

今天我遇到了多种解决方案。 运行 forEach 循环中的异步等待函数。通过构建包装器,我们可以做到这一点。

More detailed explanation on how it works internally, for the native forEach and why it is not able to make a async function call and other details on the various methods are provided in link here

可以通过多种方式完成,如下所示,

方法 1:使用包装器。

await (()=>{
     return new Promise((resolve,reject)=>{
       items.forEach(async (item,index)=>{
           try{
               await someAPICall();
           } catch(e) {
              console.log(e)
           }
           count++;
           if(index === items.length-1){
             resolve('Done')
           }
         });
     });
    })();

方法二:使用same作为Array.prototype

的泛型函数

Array.prototype.forEachAsync.js

if(!Array.prototype.forEachAsync) {
    Array.prototype.forEachAsync = function (fn){
      return new Promise((resolve,reject)=>{
        this.forEach(async(item,index,array)=>{
            await fn(item,index,array);
            if(index === array.length-1){
                resolve('done');
            }
        })
      });
    };
  }

用法:

require('./Array.prototype.forEachAsync');

let count = 0;

let hello = async (items) => {

// Method 1 - Using the Array.prototype.forEach 

    await items.forEachAsync(async () => {
         try{
               await someAPICall();
           } catch(e) {
              console.log(e)
           }
        count++;
    });

    console.log("count = " + count);
}

someAPICall = () => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve("done") // or reject('error')
        }, 100);
    })
}

hello(['', '', '', '']); // hello([]) empty array is also be handled by default

方法三:

使用Promise.all

  await Promise.all(items.map(async (item) => {
        await someAPICall();
        count++;
    }));

    console.log("count = " + count);

方法 4:传统 for 循环或现代 for 循环

// Method 4 - using for loop directly

// 1. Using the modern for(.. in..) loop
   for(item in items){

        await someAPICall();
        count++;
    }

//2. Using the traditional for loop 

    for(let i=0;i<items.length;i++){

        await someAPICall();
        count++;
    }


    console.log("count = " + count);

只是添加到原来的答案

  • 原答案中的并列阅读语法有时会令人困惑和难以阅读,也许我们可以换一种方式来写
async function printFiles() {
  const files = await getFilePaths();
  const fileReadPromises = [];

  const readAndLogFile = async filePath => {
    const contents = await fs.readFile(file, "utf8");
    console.log(contents);
    return contents;
  };

  files.forEach(file => {
    fileReadPromises.push(readAndLogFile(file));
  });

  await Promise.all(fileReadPromises);
}

  • 对于顺序操作,不仅仅是for...of,普通的for循环也可以
async function printFiles() {
  const files = await getFilePaths();

  for (let i = 0; i < files.length; i++) {
    const file = files[i];
    const contents = await fs.readFile(file, "utf8");
    console.log(contents);
  }
}

与@Bergi 的回复相似,但有一处不同。

Promise.all 拒绝所有被拒绝的承诺。

所以,使用递归。

const readFilesQueue = async (files, index = 0) {
    const contents = await fs.readFile(files[index], 'utf8')
    console.log(contents)

    return files.length <= index
        ? readFilesQueue(files, ++index)
        : files

}

const printFiles async = () => {
    const files = await getFilePaths();
    const printContents = await readFilesQueue(files)

    return printContents
}

printFiles()

PS

readFilesQueueprintFiles 之外导致由 console.log 引入的副作用*,最好模拟、测试和/或监视这样,拥有一个功能并不酷returns 内容(旁注)。

因此,代码可以简单地设计为:三个独立的函数 "pure"** 并且不引入副作用,处理整个列表并且可以轻松修改以处理失败的情况。

const files = await getFilesPath()

const printFile = async (file) => {
    const content = await fs.readFile(file, 'utf8')
    console.log(content)
}

const readFiles = async = (files, index = 0) => {
    await printFile(files[index])

    return files.lengh <= index
        ? readFiles(files, ++index)
        : files
}

readFiles(files)

未来edit/current状态

Node 支持顶级 await(这还没有插件,不会有并且可以通过 harmony flags 启用),它很酷但没有解决一个问题(从战略上讲我只在 LTS 上工作版本)。如何获取文件?

使用组合。鉴于代码,让我感觉这是在模块内部,因此应该有一个函数来执行它。如果没有,您应该使用 IIFE 将角色代码包装到一个异步函数中,创建一个简单的模块,为您完成所有工作,或者您可以使用正确的方法,即组合。

// more complex version with IIFE to a single module
(async (files) => readFiles(await files())(getFilesPath)

请注意,变量的名称会因语义而改变。您传递一个仿函数(一个可以被另一个函数调用的函数)并接收一个内存指针,该指针包含应用程序的初始逻辑块。

但是,如果不是模块,您需要导出逻辑?

将函数包装在异步函数中。

export const readFilesQueue = async () => {
    // ... to code goes here
}

或者更改变量的名称,随便...


* by side effect 意味着应用程序的任何协同效应,可以改变 statate/behaviour 或在应用程序中引入错误,如 IO.

** by "pure",它是撇号,因为它不是纯函数,代码可以收敛到纯版本,当没有控制台输出时,只有数据操作。

除此之外,为了纯粹,您将需要使用处理副作用的 monad,它们容易出错,并且与应用程序分开处理该错误。

此解决方案还针对内存进行了优化,因此您可以 运行 它处理 10,000 个数据项和请求。这里的一些其他解决方案会使服务器在大数据集上崩溃。

在打字稿中:

export async function asyncForEach<T>(array: Array<T>, callback: (item: T, index: number) => Promise<void>) {
        for (let index = 0; index < array.length; index++) {
            await callback(array[index], index);
        }
    }

如何使用?

await asyncForEach(receipts, async (eachItem) => {
    await ...
})

可以使用Array.prototype.forEach,但是async/await不太兼容。这是因为从异步回调返回的 promise 预计会得到解决,但 Array.prototype.forEach 不会解决其回调执行中的任何 promises。那么,您可以使用 forEach,但您必须自己处理 promise 解析。

这是一种使用Array.prototype.forEach

连续读取和打印每个文件的方法
async function printFilesInSeries () {
  const files = await getFilePaths()

  let promiseChain = Promise.resolve()
  files.forEach((file) => {
    promiseChain = promiseChain.then(() => {
      fs.readFile(file, 'utf8').then((contents) => {
        console.log(contents)
      })
    })
  })
  await promiseChain
}

这里有一个方法(仍然使用Array.prototype.forEach)并行打印文件的内容

async function printFilesInParallel () {
  const files = await getFilePaths()

  const promises = []
  files.forEach((file) => {
    promises.push(
      fs.readFile(file, 'utf8').then((contents) => {
        console.log(contents)
      })
    )
  })
  await Promise.all(promises)
}

如果您想同时遍历所有元素:

async function asyncForEach(arr, fn) {
  await Promise.all(arr.map(fn));
}

如果您想非并发地遍历所有元素(例如,当您的映射函数有副作用或 运行 一次遍历所有数组元素的映射器会占用太多资源):

选项 A:承诺

function asyncForEachStrict(arr, fn) {
  return new Promise((resolve) => {
    arr.reduce(
      (promise, cur, idx) => promise
        .then(() => fn(cur, idx, arr)),
      Promise.resolve(),
    ).then(() => resolve());
  });
}

选项 B:async/await

async function asyncForEachStrict(arr, fn) {
  for (let idx = 0; idx < arr.length; idx += 1) {
    const cur = arr[idx];

    await fn(cur, idx, arr);
  }
}

正如其他答案所提到的,您可能希望它按顺序而不是并行执行。 IE。 运行 第一个文件,等到它完成,然后 一旦它完成 运行 第二个文件。那不是会发生的事情。

我认为解决为什么这没有发生很重要。

想想 forEach 是如何运作的。我找不到来源,但我认为它的工作原理是这样的:

const forEach = (arr, cb) => {
  for (let i = 0; i < arr.length; i++) {
    cb(arr[i]);
  }
};

现在想想当你这样做时会发生什么:

forEach(files, async logFile(file) {
  const contents = await fs.readFile(file, 'utf8');
  console.log(contents);
});

forEachfor 循环中,我们调用 cb(arr[i]),最终成为 logFile(file)logFile 函数内部有一个 await,因此 for 循环可能会等待这个 await,然后再继续 i++?

不,不会。令人困惑的是,await 并不是这样工作的。来自 the docs:

An await splits execution flow, allowing the caller of the async function to resume execution. After the await defers the continuation of the async function, execution of subsequent statements ensues. If this await is the last expression executed by its function execution continues by returning to the function's caller a pending Promise for completion of the await's function and resuming execution of that caller.

因此,如果您有以下内容,则不会记录 "b" 之前的数字:

const delay = (ms) => {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
};

const logNumbers = async () => {
  console.log(1);
  await delay(2000);
  console.log(2);
  await delay(2000);
  console.log(3);
};

const main = () => {
  console.log("a");
  logNumbers();
  console.log("b");
};

main();

回到forEachforEach就像mainlogFile就像logNumbersmain 不会因为 logNumbers 做了一些 awaiting 而停止,forEach 不会仅仅因为 logFile 做了一些 awaiting 就停止.

替换不工作的 forEach() await 循环的简单解决方案是用 map 替换 forEach 并在开头添加 Promise.all(

例如:

await y.forEach(async (x) => {

await Promise.all(y.map(async (x) => {

最后需要一个额外的)

这是在 forEach 循环中使用异步的一个很好的例子。

编写自己的 asyncForEach

async function asyncForEach(array, callback) {  
    for (let index = 0; index < array.length; index++) {
        await callback(array[index], index, array)
    }
}

可以这样用

await asyncForEach(array, async function(item,index,array){
     //await here
   }
)

从循环中调用异步方法不好。这是因为每次循环迭代都会延迟到整个异步操作完成。那不是很高效。它还避免了 async/await.

的并行化优势

更好的解决方案是一次创建所有承诺,然后使用 Promise.all() 访问结果。否则,直到前一个操作完成后,每个后续操作才会开始。

因此,代码可以重构如下;

const printFiles = async () => {
  const files = await getFilePaths();
  const results = [];
  files.forEach((file) => {
    results.push(fs.readFile(file, 'utf8'));
  });
  const contents = await Promise.all(results);
  console.log(contents);
}

图片价值 1000 字 - 仅适用于顺序方法


背景 : 我昨晚遇到了类似的情况。我使用异步函数作为 foreach 参数。结果出乎意料。当我对我的代码进行 3 次测试时,它 运行 2 次没有问题,1 次失败。 (有点奇怪)

我终于回过神来并做了一些便笺本测试。

场景 1 - foreach 中的异步会导致多么不连续

const getPromise = (time) => { 
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(`Promise resolved for ${time}s`)
    }, time)
  })
}

const main = async () => {
  const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]
  console.log('Before For Each Loop')

  myPromiseArray.forEach(async (element, index) => {
    let result = await element;
    console.log(result);
  })

  console.log('After For Each Loop')
}

main();

场景 2 - 使用 for - of 循环作为 @Bergi 上面的建议

const getPromise = (time) => { 
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(`Promise resolved for ${time}s`)
    }, time)
  })
}

const main = async () => {
  const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]
  console.log('Before For Each Loop')

  // AVOID USING THIS
  // myPromiseArray.forEach(async (element, index) => {
  //   let result = await element;
  //   console.log(result);
  // })

  // This works well
  for (const element of myPromiseArray) {
    let result = await element;
    console.log(result)
  }

  console.log('After For Each Loop')
}

main();

如果你像我一样是个老派,你可以简单地使用经典的 for 循环,它也很有效:)

const getPromise = (time) => { 
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(`Promise resolved for ${time}s`)
    }, time)
  })
}

const main = async () => {
  const myPromiseArray = [getPromise(1000), getPromise(500), getPromise(3000)]
  console.log('Before For Each Loop')

  // AVOID USING THIS
  // myPromiseArray.forEach(async (element, index) => {
  //   let result = await element;
  //   console.log(result);
  // })

  // This works well too - the classic for loop :)
  for (let i = 0; i < myPromiseArray.length; i++) {
    const result = await myPromiseArray[i];
    console.log(result);
  }

  console.log('After For Each Loop')
}

main();

我希望这对某人有所帮助,美好的一天,干杯!

如果你不会async/await(IE11,old packer等)那么你可以试试这个递归函数。我使用 fetch 作为我的异步调用,但您可以使用任何 returns 承诺的函数。

var urlsToGet = ['https://google.com', 'https://yahoo.com'];

fetchOneAtATime(urlsToGet);

function fetchOneAtATime(urls) {
    if (urls.length === 0) {
        return;
    }
    fetch(urls[0]).finally(() => fetchOneAtATime(urls.slice(1)));
}

您可以使用异步包中的 async.forEach 循环:

async.forEach(dataToLoop(array), async(data, cb) => {
                variable = await MongoQuery;
            }, function(err) {
                console.log(err);  
              })
            })
            .catch((err)=>{
              console.log(err);
            })

@Bergi 已经给出了如何正确处理这种特殊情况的答案。我不会在这里重复。

我想解决在 asyncawait

中使用 forEachfor 循环的区别

forEach 的工作原理

让我们看看 forEach 是如何工作的。根据 ECMAScript Specification, MDN provides an implementation 可以用作 polyfill。我将其复制并粘贴到此处并删除评论。

Array.prototype.forEach = function (callback, thisArg) {
  if (this == null) { throw new TypeError('Array.prototype.forEach called on null or undefined'); }
  var T, k;
  var O = Object(this);
  var len = O.length >>> 0;
  if (typeof callback !== "function") { throw new TypeError(callback + ' is not a function'); }
  if (arguments.length > 1) { T = thisArg; }
  k = 0;
  while (k < len) {
    var kValue;
    if (k in O) {
      kValue = O[k];
      callback.call(T, kValue, k, O); // pay attention to this line
    }
    k++;
  }
};

让我们回到您的代码,让我们将回调提取为一个函数。

async function callback(file){
  const contents = await fs.readFile(file, 'utf8')
  console.log(contents)
}

所以,基本上 callback returns 一个承诺,因为它是用 async 声明的。在 forEach 中,callback 只是以正常方式调用,如果回调本身 returns 是一个承诺,javascript 引擎将不会等待它被解决或拒绝。相反,它将 promise 放入作业队列,并继续执行循环。

callback里面的await fs.readFile(file, 'utf8')怎么样?

基本上,当你的asynccallback有机会被执行时,js引擎会暂停,直到fs.readFile(file, 'utf8')被resolved或者rejected,fulfillment之后再继续执行async函数。所以 contents 变量存储 fs.readFile 的实际结果,而不是 promise。因此,console.log(contents) 注销文件内容而不是 Promise

为什么 for ... of 有效?

当我们编写一个通用的 for of 循环时,我们比 forEach 获得更多的控制权。让我们重构 printFiles.

async function printFiles () {
  const files = await getFilePaths() // Assume this works fine

  for (const file of files) {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
    // or await callback(file)
  }
}

当评估 for 循环时,我们在 async 函数中有 await 承诺,执行将暂停,直到 await 承诺得到解决。所以,你可以认为文件是按照确定的顺序一个一个读取的。

顺序执行

有时,我们确实需要按顺序执行异步函数。例如,我有一些新记录存储在一个数组中要保存到数据库中,我希望它们按顺序保存,这意味着数组中的第一条记录应该首先保存,然后是第二条,直到最后一条记录被保存。

这是一个例子:

const records = [1, 2, 3, 4];

async function saveRecord(record) {
  return new Promise((resolved, rejected) => {
    setTimeout(()=> {
      resolved(`record ${record} saved`)
    }, Math.random() * 500)
  });
}

async function forEachSaveRecords(records) {
  records.forEach(async (record) => {
    const res = await saveRecord(record);
    console.log(res);
  })
}

async function forofSaveRecords(records) {
  for (const record of records) {
    const res = await saveRecord(record);
    console.log(res);
  }
}
(async () => {
  console.log("=== for of save records ===")
  await forofSaveRecords(records)
  
  console.log("=== forEach save records ===")
  await forEachSaveRecords(records)
})()

我使用 setTimeout 来模拟将记录保存到数据库的过程 - 它是异步的并且花费随机时间。使用forEach,记录以未确定的顺序保存,但使用for..of,它们按顺序保存。

这不会像 OP 要求的那样使用 async/await 并且 只有 如果您在后端使用节点JS。虽然对有些人还是有帮助的,因为OP给的例子是读取文件内容,一般都是在后台读取文件。

完全异步和非阻塞:

const fs = require("fs")
const async = require("async")

const obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"}
const configs = {}

async.forEachOf(obj, (value, key, callback) => {
    fs.readFile(__dirname + value, "utf8", (err, data) => {
        if (err) return callback(err)
        try {
            configs[key] = JSON.parse(data);
        } catch (e) {
            return callback(e)
        }
        callback()
    });
}, err => {
    if (err) console.error(err.message)
    // configs is now a map of JSON data
    doSomethingWith(configs)
})

OP 的原始问题

Are there any issues with using async/await in a forEach loop? ...

在@Bergi 的 中进行了一定程度的介绍, 其中展示了如何串行和并行处理。然而,还有其他关于并行性的问题 -

  1. 订单 -- 注意到 -

For example if a really small file finishes reading before a really large file, it will be logged first, even if the small file comes after the large file in the files array.

  1. 可能一次打开太多文件 -- Bergi 在另一个
  2. 下的评论

It is also not good to open thousands of files at once to read them concurrently. One always has to do an assessment whether a sequential, parallel, or mixed approach is better.

因此,让我们解决这些问题,展示简洁明了的实际代码, 使用第三方库。易于剪切、粘贴和修改的内容。

并行读取(一次全部),串行打印(每个文件尽可能早)。

最简单的改进是像 中一样执行完全并行,但做一个小改动,以便在保持顺序 的同时尽快 打印每个文件。

async function printFiles2() {
  const readProms = (await getFilePaths()).map((file) =>
    fs.readFile(file, "utf8")
  );
  await Promise.all([
    await Promise.all(readProms),                      // branch 1
    (async () => {                                     // branch 2
      for (const p of readProms) console.log(await p);
    })(),
  ]);
}

以上,两个独立的分支同时运行。

  • 分支 1:同时并行读取,
  • 分支 2:串行读取以强制排序,但等待的时间不要超过必要的时间

这很简单。

并行读取并发限制,串行打印(每个文件尽可能早)。

“并发限制”意味着同时读取的文件不会超过 N 个。
就像一家商店一次只允许这么多顾客(至少在 COVID 期间)。

首先介绍一个辅助函数-

function bootablePromise(kickMe: () => Promise<any>) {
  let resolve: (value: unknown) => void = () => {};
  const promise = new Promise((res) => { resolve = res; });
  const boot = () => { resolve(kickMe()); };
  return { promise, boot };
}

函数 bootablePromise(kickMe:() => Promise<any>) 需要一个 函数 kickMe 作为启动任务的参数(在我们的例子中 readFile)。但是并没有立即启动。

bootablePromise returns 几个属性

  • promise 类型 Promise
  • boot 类型函数 ()=>void

promise人生有两个阶段

  1. 作为开始任务的承诺
  2. 作为承诺完成它已经开始的任务。
当调用 boot() 时,

promise 从第一个状态转换到第二个状态。

bootablePromise用于printFiles--

async function printFiles4() {
  const files = await getFilePaths();
  const boots: (() => void)[] = [];
  const set: Set<Promise<{ pidx: number }>> = new Set<Promise<any>>();
  const bootableProms = files.map((file,pidx) => {
    const { promise, boot } = bootablePromise(() => fs.readFile(file, "utf8"));
    boots.push(boot);
    set.add(promise.then(() => ({ pidx })));
    return promise;
  });
  const concurLimit = 2;
  await Promise.all([
    (async () => {                                       // branch 1
      let idx = 0;
      boots.slice(0, concurLimit).forEach((b) => { b(); idx++; });
      while (idx<boots.length) {
        const { pidx } = await Promise.race([...set]);
        set.delete([...set][pidx]);
        boots[idx++]();
      }
    })(),
    (async () => {                                       // branch 2
      for (const p of bootableProms) console.log(await p);
    })(),
  ]);
}

和以前一样有两个分支

  • 分支 1:用于 运行并发处理。
  • 分支 2:用于打印

现在的区别是最多允许 concurLimit promises 同时 运行。

重要的变量是

  • boots:要调用以强制其相应承诺转换的函数数组。它仅在分支 1 中使用。
  • set: 随机访问容器中有承诺,一旦实现就可以轻松删除。此容器仅在分支 1 中使用。
  • bootableProms:这些是最初在 set 中的 smae 承诺,但它是一个数组而不是一个集合,并且数组永远不会改变。它仅在分支 2 中使用。

运行 一个模拟 fs.readFile 需要如下时间(文件名与以毫秒为单位的时间)。

const timeTable = {
  "1": 600,
  "2": 500,
  "3": 400,
  "4": 300,
  "5": 200,
  "6": 100,
};

测试 运行 次出现这样的情况,表明并发正在工作 --

[1]0--0.601
[2]0--0.502
[3]0.503--0.904
[4]0.608--0.908
[5]0.905--1.105
[6]0.905--1.005

可在 typescript playground sandbox

中作为可执行文件使用
files.forEach(async (file) => {
    const contents = await fs.readFile(file, 'utf8')
})

问题是,forEach() 忽略了迭代函数返回的承诺。结果,所有 fs.readFile 函数 在事件循环的同一轮中被调用,这意味着它们是并行启动的,而不是顺序启动的,并且在调用 forEach() 之后立即继续执行,而不 等待所有 fs.readFile 操作完成。由于 forEach 不会等待每个 promise 得到解决,因此循环实际上在 promise 得到解决之前完成迭代。您最终可能会尝试访问尚不可用的值。

在 2022 年,我仍然建议使用外部库来处理所有这些异步流程。我已经为类似的事情创建了模块 alot

你的例子是:

import fs from 'fs-promise'
import alot from 'alot'

async function printFiles () {
    const files = await getFilePaths() // Assume this works fine

    await alot(files)
        .forEachAsync(async file => {
            let content = await fs.readFile(file, 'utf8');
            console.log(content);
        })
        .toArrayAsync({ threads: 4 });
    }
}
printFiles()

对于简单的示例,异步 for..of 肯定可以完成这项工作,但是一旦任务变得更加复杂,您就必须为此使用一些实用程序。

Alot 还有许多其他方法可以链接,例如 mapAsyncfilterAsyncgroupAsync

举个例子:

  • 加载 JSON 包含产品元数据的文件
  • 摘录ProductID
  • 从服务器加载产品
  • 过滤掉价格 > 100 美元的那些
  • 按价格升序排列
  • 进入前50

import fs from 'fs-promise'
import alot from 'alot'
import axios from 'axios'
import { File } from 'atma-io'

let paths = await getFilePaths();
let products = await alot(paths)
    .mapAsync(async path => await File.readAsync<IProductMeta>(path))
    .mapAsync(async meta => await axios.get(`${server}/api/product/${meta.productId}`))
    .mapAsync(resp => resp.data)
    .filterAsync(product => product.price > 100)
    .sortBy(product => product.price, 'asc')
    .takeAsync(50)
    .toArrayAsync({ threads: 5, errors: 'include' });