使用异步处理高地流块

processing highland stream chunks using async

我正在使用 highland.js 处理文件,使用流读取两个分隔符之间的内容。我还按顺序使用 async.js 到 运行 一系列 http 请求。

理想情况下,我想将来自 highland 的输出 x 作为第一个函数传递给 async 系列(链),以便为从流中提取的每个块执行 HTTP 请求。

这可能吗?如果可以,如何实现?

var async = require('async');
var _ = require('highland');


_(fs.createReadStream(files[0], { encoding: 'utf8' }))
        .splitBy('-----BEGIN-----\n')
        .splitBy('\n-----END-----\n')
        .filter(chunk => chunk !== '')
        .each(function (x) {
        }).done(function () {

    async.series([
        function(callback) {
            setTimeout(function() {
                console.log('Task 1');
                callback(null, 1);
            }, 300);
        },
        function(callback) {
            setTimeout(function() {
                console.log('Task 2');
                callback(null, 2);
            }, 200);
        },
    ], function(error, results) {
        console.log(results);
    });

});;

您可以摆脱对 eachdone 的调用。过滤后,可以跟进.toArray(callback)。向回调传递一个包含高地结果的数组。你可以这样重构

var Q = require('q');
var _ = require('highland');


_(fs.createReadStream(files[0], { encoding: 'utf8' }))
        .splitBy('-----BEGIN-----\n')
        .splitBy('\n-----END-----\n')
        .filter(chunk => chunk !== '')
        .each(asyncTasks);

function asyncTasks(x) { // here, x will be each of the results from highland
    async.series([
      // do something with x results
        function(callback) {
          console.log('Task 1');
          callback(null, 1);
        },
        // do something else with x results
        function(callback) {
          console.log('Task 2');
          callback(null, 2);
        },
    ], function(error, results) {
        console.log(results);
    });
}

heretoArray 文档的 link。 toArray 消耗流,就像 done 一样。如果您有任何问题,请告诉我。

老实说,我认为您最好改用 promises。虽然一部分只是个人喜好,一部分是因为它使代码更具可读性。从 what I've read 开始,async 比 promise 的性能更高,但 promises 的好处在于您可以将结果从一个函数传递到下一个函数。因此,在您的示例中,您可以在第一部分对 x 做一些事情,然后将修改后的结果传递给下一个函数,然后传递给下一个函数,依此类推。当你使用 async.series 时,你通过调用 callback(null, result) 来完成每个函数,直到你在系列的最后完成时你才得到结果,当你从所有对 callback 的调用。现在,您始终可以将结果保存到 async.series 之外的某个变量,但这会使您的代码更加混乱。如果你想用 promises 重写它,它看起来如下。我在这里使用 q,但它只是您可以使用的众多 promise 库之一。

    var async = require('async');
    var _ = require('highland');


    _(fs.createReadStream(files[0], { encoding: 'utf8' }))
            .splitBy('-----BEGIN-----\n')
            .splitBy('\n-----END-----\n')
            .filter(chunk => chunk !== '')
            .each(asyncTasks);

    function asyncTasks(x) { // here, x will be an array of the results from highland
      return asyncTask1(x)
              .then(asyncTask2)
              .then(asyncTask3)
    }

    function asyncTask1(x) {
      var deferred = Q.defer();

      // do some stuff

      if (// some error condition) {
        deferred.reject();
      } else {
        deferred.resolve(x); // or pass along some modified version of x
      }

      return deferred.promise;
    }

    function asyncTask2(x) {
      // same structure as above
    }

    function asyncTask3(x) {
      // same structure as above
    }

现在一些异步 API 已经开始 return promises,除了接受回调,或者有时代替。因此,适应起来会是一件好事。 Promise 非常有用。您可以阅读更多关于它们的信息 here and here.