Async.js - ETIMEDOUT 和回调已被调用

Async.js - ETIMEDOUT and Callback was already called

当我 运行 index.js.

时,我不断收到 ETIMEDOUTECONNRESET 错误,然后是 Callback was already called 错误

起初我以为是因为我在调用 onEachLimitItem 回调之前没有包含 return。所以我根据 async multiple callbacks documentation 包含了它。还是没有解决。我还尝试删除错误事件并删除错误事件中对 onEachLimit 的回调,但都没有用。我查看了有关 Callback already called 问题的其他 SO 问题,但因为它们与流无关,所以我没有找到解决方案。

我的理解是,如果流遇到ECONNRESET这样的错误,它会return错误事件中的回调并继续进行下一个流,但这似乎不是是这样的。似乎如果错误自行解决,即它重新连接并尝试再次将错误的流发送到 Azure 并且它起作用,然后它触发 'finish' 事件,我们得到 Callback already called.

我是否正确处理了流事件中的回调?

var Q = require('q');
var async = require('async');
var webshot = require('webshot');
var Readable = require('stream').Readable;
var azure = require('azure-storage');

var blob = azure.createBlobService('123', '112244');
var container = 'awesome';

var countries = [
    'en-us', 'es-us', 'en-au', 'de-at', 'pt-br', 'en-ca', 'fr-ca', 'cs-cz', 'ar-ly', 'es-ve',
    'da-dk', 'fi-fi', 'de-de', 'hu-hu', 'ko-kr', 'es-xl', 'en-my', 'nl-nl', 'en-nz', 'nb-no',
    'nn-no', 'pl-pl', 'ro-ro', 'ru-ru', 'ca-es', 'es-es', 'eu-es', 'gl-es', 'en-gb', 'es-ar',
    'nl-be', 'bg-bg', 'es-cl', 'zh-cn', 'es-co', 'es-cr', 'es-ec', 'et-ee', 'fr-fr', 'el-gr',
    'zh-hk', 'en-in', 'id-id', 'en-ie', 'he-il', 'it-it', 'ja-jp', 'es-mx', 'es-pe', 'en-ph'
];

var uploadStreamToStorage = function (fileName, stream, onEachLimitItem) {
    var readable = new Readable().wrap(stream);
    var writeable = blob.createWriteStreamToBlockBlob(container, fileName);

    readable.pipe(writeable);

    writeable.on('error', function (error) {
        return onEachLimitItem.call(error);
    });

    writeable.on('finish', function () {
        onEachLimitItem.call(null);
    });
};

var takeIndividualScreenshot = function (ID, country, onEachLimitItem) {
    var fileName = ID + '-' + country + '.jpg';
    var url = 'https://example.com/' + country + '/' + ID;

    webshot(url, function (error, stream) {
        if (error) { throw 'Screenshot not taken'; }

        uploadStreamToStorage(fileName, stream, onEachLimitItem);

    });
};

var getAllCountriesOfId = function (ID) {
    var deferred = Q.defer();
    var limit = 5;

    function onEachCountry(country, onEachLimitItem) {
        takeIndividualScreenshot(ID, country, onEachLimitItem);
    }

    async.eachLimit(countries, limit, onEachCountry, function (error) {
        if (error) { deferred.reject(error); }
        deferred.resolve();
    });

    return deferred.promise;
};

var createContainer = function () {
    var df = Q.defer();
    var self = this;

    blob.createContainerIfNotExists(this.container, this.containerOptions, function (error) {

        if (error) { df.reject(error); }

        df.resolve(self.container);
    });

    return df.promise;
};

createContainer()
    .then(function () {
        return getAllCountriesOfId('211007');
    })
    .then(function () {
        return getAllCountriesOfId('123456');
    })
    .fail(function (error) {
        log.info(error);
    });

如您所知,您正在让回调被调用两次。问题是;您是要在迭代流时停止所有错误还是要累积流中的所有错误?

有多种方法可以捕获和处理您已经在做的错误,但是因为您没有抛出错误对象导致从您的数据流进行额外的调用而导致致命错误。

您代码中的实际问题是由于您 return 的范围造成的。当您处理错误并尝试 return 回调并停止脚本执行时,小时 return 的范围是流错误处理程序的本地范围,而不是全局脚本,因此脚本继续并继续前进下一个有效流。

writeable.on('error', function (error) {
   // This 'return' is in the local scope of 'writable.on('error')'
  return onEachLimitItem.call(error);
});

它或许可以设置一个数组,然后在该函数局部范围之外处理错误。即

// Set the array's scope as global to the writable.on() error 
var errResults = [];
writeable.on('error', function (error) {
  // Push the local scoped 'error' into the global scoped 'errResults' array
  errResults.push(error);
});

writeable.on('finish', function () {
  // Are there any errors?
  return (errResults.length > 0) ?
    onEachLimitItem.call(errors) : onEachLimitItem.call(null);
});

以上只是您可以解决问题的一种方法。

我不确定您是否阅读了 Joyent(原始 node.js 语言支持者)提供的错误处理帮助,但它应该让您对处理错误时的选择有一个很好的了解。

https://www.joyent.com/developers/node/design/errors