使用异步处理高地流块
processing highland stream chunks using async
我正在使用 highland.js
处理文件,使用流读取两个分隔符之间的内容。我还按顺序使用 async.js
到 运行 一系列 http 请求。
理想情况下,我想将来自 highland 的输出 x
作为第一个函数传递给 async
系列(链),以便为从流中提取的每个块执行 HTTP 请求。
这可能吗?如果可以,如何实现?
var async = require('async');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(function (x) {
}).done(function () {
async.series([
function(callback) {
setTimeout(function() {
console.log('Task 1');
callback(null, 1);
}, 300);
},
function(callback) {
setTimeout(function() {
console.log('Task 2');
callback(null, 2);
}, 200);
},
], function(error, results) {
console.log(results);
});
});;
您可以摆脱对 each
和 done
的调用。过滤后,可以跟进.toArray(callback)
。向回调传递一个包含高地结果的数组。你可以这样重构
var Q = require('q');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(asyncTasks);
function asyncTasks(x) { // here, x will be each of the results from highland
async.series([
// do something with x results
function(callback) {
console.log('Task 1');
callback(null, 1);
},
// do something else with x results
function(callback) {
console.log('Task 2');
callback(null, 2);
},
], function(error, results) {
console.log(results);
});
}
here 是 toArray
文档的 link。 toArray
消耗流,就像 done
一样。如果您有任何问题,请告诉我。
老实说,我认为您最好改用 promises。虽然一部分只是个人喜好,一部分是因为它使代码更具可读性。从 what I've read 开始,async 比 promise 的性能更高,但 promises 的好处在于您可以将结果从一个函数传递到下一个函数。因此,在您的示例中,您可以在第一部分对 x
做一些事情,然后将修改后的结果传递给下一个函数,然后传递给下一个函数,依此类推。当你使用 async.series
时,你通过调用 callback(null, result)
来完成每个函数,直到你在系列的最后完成时你才得到结果,当你从所有对 callback
的调用。现在,您始终可以将结果保存到 async.series 之外的某个变量,但这会使您的代码更加混乱。如果你想用 promises 重写它,它看起来如下。我在这里使用 q
,但它只是您可以使用的众多 promise 库之一。
var async = require('async');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(asyncTasks);
function asyncTasks(x) { // here, x will be an array of the results from highland
return asyncTask1(x)
.then(asyncTask2)
.then(asyncTask3)
}
function asyncTask1(x) {
var deferred = Q.defer();
// do some stuff
if (// some error condition) {
deferred.reject();
} else {
deferred.resolve(x); // or pass along some modified version of x
}
return deferred.promise;
}
function asyncTask2(x) {
// same structure as above
}
function asyncTask3(x) {
// same structure as above
}
现在一些异步 API 已经开始 return promises,除了接受回调,或者有时代替。因此,适应起来会是一件好事。 Promise 非常有用。您可以阅读更多关于它们的信息 here and here.
我正在使用 highland.js
处理文件,使用流读取两个分隔符之间的内容。我还按顺序使用 async.js
到 运行 一系列 http 请求。
理想情况下,我想将来自 highland 的输出 x
作为第一个函数传递给 async
系列(链),以便为从流中提取的每个块执行 HTTP 请求。
这可能吗?如果可以,如何实现?
var async = require('async');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(function (x) {
}).done(function () {
async.series([
function(callback) {
setTimeout(function() {
console.log('Task 1');
callback(null, 1);
}, 300);
},
function(callback) {
setTimeout(function() {
console.log('Task 2');
callback(null, 2);
}, 200);
},
], function(error, results) {
console.log(results);
});
});;
您可以摆脱对 each
和 done
的调用。过滤后,可以跟进.toArray(callback)
。向回调传递一个包含高地结果的数组。你可以这样重构
var Q = require('q');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(asyncTasks);
function asyncTasks(x) { // here, x will be each of the results from highland
async.series([
// do something with x results
function(callback) {
console.log('Task 1');
callback(null, 1);
},
// do something else with x results
function(callback) {
console.log('Task 2');
callback(null, 2);
},
], function(error, results) {
console.log(results);
});
}
here 是 toArray
文档的 link。 toArray
消耗流,就像 done
一样。如果您有任何问题,请告诉我。
老实说,我认为您最好改用 promises。虽然一部分只是个人喜好,一部分是因为它使代码更具可读性。从 what I've read 开始,async 比 promise 的性能更高,但 promises 的好处在于您可以将结果从一个函数传递到下一个函数。因此,在您的示例中,您可以在第一部分对 x
做一些事情,然后将修改后的结果传递给下一个函数,然后传递给下一个函数,依此类推。当你使用 async.series
时,你通过调用 callback(null, result)
来完成每个函数,直到你在系列的最后完成时你才得到结果,当你从所有对 callback
的调用。现在,您始终可以将结果保存到 async.series 之外的某个变量,但这会使您的代码更加混乱。如果你想用 promises 重写它,它看起来如下。我在这里使用 q
,但它只是您可以使用的众多 promise 库之一。
var async = require('async');
var _ = require('highland');
_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(asyncTasks);
function asyncTasks(x) { // here, x will be an array of the results from highland
return asyncTask1(x)
.then(asyncTask2)
.then(asyncTask3)
}
function asyncTask1(x) {
var deferred = Q.defer();
// do some stuff
if (// some error condition) {
deferred.reject();
} else {
deferred.resolve(x); // or pass along some modified version of x
}
return deferred.promise;
}
function asyncTask2(x) {
// same structure as above
}
function asyncTask3(x) {
// same structure as above
}
现在一些异步 API 已经开始 return promises,除了接受回调,或者有时代替。因此,适应起来会是一件好事。 Promise 非常有用。您可以阅读更多关于它们的信息 here and here.