如何 运行 多个节点流按顺序?
How do I run multiple node streams in sequence?
给定两个流,stream1
,stream2
,我如何按顺序 运行 它们,如果失败则抛出?
我正在寻找比这更简单的东西:
stream1.on('end',function(){
stream2.on('end',done);
});
stream1.on('error',function(error){done(error);});
stream2.on('error',function(error){done(error);});
谢谢。
有一些方法可以做到这一点,请查看下一个 link,它将有助于如何以优雅的方式在节点中编写一些代码:
希望它是你所需要的。
试试 Promise
function doStream1(cb) {
// put those operation on stream1 in the callback function...
cb && cb();
var p = new Promise(function(resolve, reject) {
stream1.on( 'end', resolve );
stream1.on( 'error', reject );
});
return p;
}
function doStream2(cb) {
// some operation on stream2 on callback
cb && cb();
var p = new Promise(function(resolve, reject) {
stream2.on( 'end', resolve );
stream2.on( 'error', reject );
});
return p;
}
doStream1(cb).then(function() {
return doStream2(cb);
}).catch(function(e) {
// error handling is here
});
根据您的用例,您可以使用 multistream 模块将两个流合并为一个。
多流由流数组构成
var MultiStream = require('multistream')
var fs = require('fs')
var streams = [
fs.createReadStream(__dirname + '/numbers/1.txt'), // contains a single char '1'
fs.createReadStream(__dirname + '/numbers/2.txt'), // contains a single char '2'
fs.createReadStream(__dirname + '/numbers/3.txt') // contains a single char '3'
]
MultiStream(streams).pipe(process.stdout) // => 123
如果组合流不适合您的用例,您可以自行构建基于结束事件发送功能的流
const fs = require('fs');
var number1 = fs.createReadStream('./numbers1.txt')
.on('data', d => console.log(d.toString()));
var number2 = fs.createReadStream('./numbers2.txt')
.on('data', d => console.log(d.toString()));
onEnd([number1, number2], function(err) {
console.log('Ended with', err);
});
function onEnd(streams, cb) {
var count = streams.length;
var ended = 0;
var errored = null;
function shouldEnd() {
ended++;
if (errored) { return; }
if (count == ended) {
cb();
}
}
function endWithError(err) {
if (errored) { return; }
errored = true;
cb(err);
}
streams.forEach(s => s
.on('end', shouldEnd)
.on('error', endWithError)
);
}
onEnd
函数可用于等待流数组结束,或者在为第一个发出的错误事件发出错误事件的情况下。
尝试使用 async
函数:
const { createReadStream } = require("fs")
async function main() {
const stream1 = createReadStream(__dirname + "/1.txt")
await pipe(stream1, process.stdout)
const stream2 = createReadStream(__dirname + "/2.txt")
await pipe(stream2, process.stdout)
const stream3 = createReadStream(__dirname + "/3.txt")
await pipe(stream3, process.stdout)
}
async function pipe(tap, sink) {
return new Promise((resolve, reject) => {
tap.pipe(sink, { end: false })
tap.on("end", resolve)
tap.on("error", reject)
})
}
尝试下面的代码(sequenceStream
函数)。我担心的是错误处理,但它应该可以工作。
另外,如果你使用 Node < 10.* 那么你需要 end-of-stream 而不是 stream.finished
const stream = require('stream');
const process = require('process')
const { promisify } = require('util')
const fromList = (lst) => new stream.Readable({
read() {
if (lst.length) {
this.push(String(lst.shift()))
} else {
this.push(null)
}
}
})
const _finished = promisify(stream.finished)
const sequenceStream = (streams) => {
const resultingStream = new stream.PassThrough()
let isNext = Promise.resolve()
for(const [i, curStream] of streams.entries()) {
isNext = isNext.then(() => {
curStream.pipe(resultingStream, {end: i === streams.length -1})
return _finished(curStream)
}).catch((err) => {
resultingStream.write(err)
})
}
return resultingStream
}
sequenceStream([
fromList([1, 2, 3, 4]),
fromList([5, 6, 7, 8]),
fromList([9, 10])
]).pipe(process.stdout)
给定两个流,stream1
,stream2
,我如何按顺序 运行 它们,如果失败则抛出?
我正在寻找比这更简单的东西:
stream1.on('end',function(){
stream2.on('end',done);
});
stream1.on('error',function(error){done(error);});
stream2.on('error',function(error){done(error);});
谢谢。
有一些方法可以做到这一点,请查看下一个 link,它将有助于如何以优雅的方式在节点中编写一些代码:
希望它是你所需要的。
试试 Promise
function doStream1(cb) {
// put those operation on stream1 in the callback function...
cb && cb();
var p = new Promise(function(resolve, reject) {
stream1.on( 'end', resolve );
stream1.on( 'error', reject );
});
return p;
}
function doStream2(cb) {
// some operation on stream2 on callback
cb && cb();
var p = new Promise(function(resolve, reject) {
stream2.on( 'end', resolve );
stream2.on( 'error', reject );
});
return p;
}
doStream1(cb).then(function() {
return doStream2(cb);
}).catch(function(e) {
// error handling is here
});
根据您的用例,您可以使用 multistream 模块将两个流合并为一个。
多流由流数组构成
var MultiStream = require('multistream')
var fs = require('fs')
var streams = [
fs.createReadStream(__dirname + '/numbers/1.txt'), // contains a single char '1'
fs.createReadStream(__dirname + '/numbers/2.txt'), // contains a single char '2'
fs.createReadStream(__dirname + '/numbers/3.txt') // contains a single char '3'
]
MultiStream(streams).pipe(process.stdout) // => 123
如果组合流不适合您的用例,您可以自行构建基于结束事件发送功能的流
const fs = require('fs');
var number1 = fs.createReadStream('./numbers1.txt')
.on('data', d => console.log(d.toString()));
var number2 = fs.createReadStream('./numbers2.txt')
.on('data', d => console.log(d.toString()));
onEnd([number1, number2], function(err) {
console.log('Ended with', err);
});
function onEnd(streams, cb) {
var count = streams.length;
var ended = 0;
var errored = null;
function shouldEnd() {
ended++;
if (errored) { return; }
if (count == ended) {
cb();
}
}
function endWithError(err) {
if (errored) { return; }
errored = true;
cb(err);
}
streams.forEach(s => s
.on('end', shouldEnd)
.on('error', endWithError)
);
}
onEnd
函数可用于等待流数组结束,或者在为第一个发出的错误事件发出错误事件的情况下。
尝试使用 async
函数:
const { createReadStream } = require("fs")
async function main() {
const stream1 = createReadStream(__dirname + "/1.txt")
await pipe(stream1, process.stdout)
const stream2 = createReadStream(__dirname + "/2.txt")
await pipe(stream2, process.stdout)
const stream3 = createReadStream(__dirname + "/3.txt")
await pipe(stream3, process.stdout)
}
async function pipe(tap, sink) {
return new Promise((resolve, reject) => {
tap.pipe(sink, { end: false })
tap.on("end", resolve)
tap.on("error", reject)
})
}
尝试下面的代码(sequenceStream
函数)。我担心的是错误处理,但它应该可以工作。
另外,如果你使用 Node < 10.* 那么你需要 end-of-stream 而不是 stream.finished
const stream = require('stream');
const process = require('process')
const { promisify } = require('util')
const fromList = (lst) => new stream.Readable({
read() {
if (lst.length) {
this.push(String(lst.shift()))
} else {
this.push(null)
}
}
})
const _finished = promisify(stream.finished)
const sequenceStream = (streams) => {
const resultingStream = new stream.PassThrough()
let isNext = Promise.resolve()
for(const [i, curStream] of streams.entries()) {
isNext = isNext.then(() => {
curStream.pipe(resultingStream, {end: i === streams.length -1})
return _finished(curStream)
}).catch((err) => {
resultingStream.write(err)
})
}
return resultingStream
}
sequenceStream([
fromList([1, 2, 3, 4]),
fromList([5, 6, 7, 8]),
fromList([9, 10])
]).pipe(process.stdout)