Node.js - 异步 - if 语句中的多个内部回调

Node.js - Async - multiple innercallbacks within a if statement

我正在使用 Node.js 和异步库,但是我一直看到错误:Callback was already called.

我想我明白为什么我会收到错误,但是我不知道是否真的可以执行 following/how 可以解决的问题。

基本上我希望两个内部回调都在外部回调完成之前完成。

所以我遇到这个问题的代码如下所示:

async.forEachLimit(inData, 25, function (data, innercallback) {

    myJson.matches.forEach(function (oMatches) {
        if (data.$.id == oMatches.SourceId) {

            oMatches.ids.forEach(function (odId) {
                client.execute("g.addV('test').property('id', \"" + data.$.id + "\")", {},
                    function (err) {
                        setTimeout(function () { innercallback(err) }, 2000);
                    });


                client.execute("g.V('" + data.$.id + "').addE('matches').to(g.V('xyz'))", {},
                    function (err) {
                        setTimeout(function () { innercallback(err) }, 2000);
                    });
            })

        } //there is no else case.

    });

}, outercallback);

顺便说一句 - 我正在使用 setTimeoutasync.forEachLimit 来减少对 Azure 的请求数量(因为我没有太多)

Promise可以保证异步回调的顺序

看看async#AsyncFunction in the official document async 个包裹

Wherever we accept a Node-style async function, we also directly accept an ES2017 async function. In this case, the async function will not be passed a final callback argument, and any thrown error will be used as the err argument of the implicit callback, and the return value will be used as the result value. (i.e. a rejected of the returned Promise becomes the err callback argument, and a resolved value becomes the result.)

async#forEachLimit 的第三个参数是一个 async#AsyncFunction,所以你可以 return 它的一个承诺,然后解析这个承诺以表明它的工作已经完成。

您的代码可以改进如下,

async.forEachLimit(inData, 25, function (data, innercallback) {

  // we will wait for all async function in here to complete
  // then call the innercallback
  var asyncPromises = []

  myJson.matches.forEach(function (oMatches) {

    if (data.$.id == oMatches.SourceId) {

      oMatches.ids.forEach(function (odId) {
        asyncPromises.push(new Promise(function (resolve, reject) {
          client.execute("g.addV('test').property('id', \"" + data.$.id + "\")", {},
            function (err) {
              setTimeout(function () {
                if (err) {
                  reject(err)
                  return
                }
                resolve();
              }, 2000);
            });
        }))

        asyncPromises.push(new Promise(function (resolve, reject) {
          client.execute("g.V('" + data.$.id + "').addE('matches').to(g.V('xyz'))", {},
            function (err) {
              setTimeout(function () {
                if (err) {
                  reject(err)
                  return
                }
                resolve();
              }, 2000);
            });
        }))

      })

    } //there is no else case.

  })

  // Here we can ensure that innercallback is called only for once
  Promise.all(asyncPromises)
    .then(function () {
      innercallback(null)
    })
    .catch(function (err) {
      // handle the error here
      innercallback(err)
    })

}, outercallback);

请注意,您应该确保在您的 Node 环境中有 Promise 支持。 Node v6 之后支持内置 Promise。查看 here

已更新

我误解了你的内部回调,它们必须在调用 outercallback 之前完成。我已经用 Promise.all 更正了它,它将 Promise 的数组作为参数和 return a Promise 如果所有子 Promises 将被解析如果 sub-Promise 之一被拒绝,则全部解决或拒绝。参见 Promise#all

更新(2018.05.14)

您必须确保从 async 包的 AsyncFunction(即 async#forEachLimit 的第 3 个参数)收到的每个 innercallback 只被调用 一次 在每次迭代期间。在每次迭代中执行 Array#forEach 时尤其要小心,因为它可能会使您在迭代中多次调用 innercallback

我更新了上面的代码块,我从 client.execute 的回调中删除了对 innercallback 的所有调用,并将所有 client.execute 放入一个承诺中。之后,我将所有承诺收集到 asyncPromises 数组中。 Promise#all 用于确保所有承诺都已解决(即所有 client.execute 已完成),然后最终调用 innercallback。或者,如果上面的承诺之一被拒绝并陷入 Promise#catch,调用 innercallback 并将第一个参数作为错误原因。

您正在调用 innercallback 两次,以防 client.execute 都抛出错误。您可以使用 async.parallel 函数或 Promise.all,这里是 async.parallel 的示例,您还需要在 else

中调用 innercallback 函数
async.forEachLimit(inData, 25, function(data, innercallback) {
    async.eachSeries(myJson.matches, function(oMatches, callback) {
        if (data.$.id == oMatches.SourceId) {
            async.eachSeries(oMatches.ids, function(odId, callback) {
                async.parallel([
                    function(callback) {
                        client.execute("g.addV('test').property('id', \"" + data.$.id + "\")", {}, callback);
                    },
                    function(callback) {
                        client.execute("g.V('" + data.$.id + "').addE('matches').to(g.V('xyz'))", {}, callback);
                    }
                ], callback);
            }, callback)

        } else {
            callback();
        }
    }, innercallback);
}, outercallback);

更新: 更新代码,现在使用 async.eachSeries 代替 Array.forEach