如何 运行 多个节点流按顺序?

How do I run multiple node streams in sequence?

给定两个流,stream1stream2,我如何按顺序 运行 它们,如果失败则抛出?

我正在寻找比这更简单的东西:

stream1.on('end',function(){
   stream2.on('end',done);
});
stream1.on('error',function(error){done(error);});
stream2.on('error',function(error){done(error);});

谢谢。

有一些方法可以做到这一点,请查看下一个 link,它将有助于如何以优雅的方式在节点中编写一些代码:

Node.js FLOW

希望它是你所需要的。

试试 Promise

function doStream1(cb) {
    // put those operation on stream1 in the callback function...    
    cb && cb();

    var p = new Promise(function(resolve, reject) {
        stream1.on( 'end', resolve );
        stream1.on( 'error', reject );
    });

    return p;
}

function doStream2(cb) {
   // some operation on stream2 on callback 
   cb && cb();

    var p = new Promise(function(resolve, reject) {
        stream2.on( 'end', resolve );
        stream2.on( 'error', reject );
    });

    return p;
}

doStream1(cb).then(function() {
   return doStream2(cb);
}).catch(function(e) {
   // error handling is here
});

根据您的用例,您可以使用 multistream 模块将两个流合并为一个。

多流由流数组构成

var MultiStream = require('multistream')
var fs = require('fs')

var streams = [
  fs.createReadStream(__dirname + '/numbers/1.txt'), // contains a single char '1'
  fs.createReadStream(__dirname + '/numbers/2.txt'), // contains a single char '2'
  fs.createReadStream(__dirname + '/numbers/3.txt')  // contains a single char '3'
]

MultiStream(streams).pipe(process.stdout) // => 123

如果组合流不适合您的用例,您可以自行构建基于结束事件发送功能的流

const fs = require('fs');

var number1 = fs.createReadStream('./numbers1.txt')
  .on('data', d => console.log(d.toString()));

var number2 = fs.createReadStream('./numbers2.txt')
  .on('data', d => console.log(d.toString()));

onEnd([number1, number2], function(err) {
  console.log('Ended with', err);
});

function onEnd(streams, cb) {
  var count = streams.length;
  var ended = 0;
  var errored = null;

  function shouldEnd() {
    ended++;

    if (errored) { return; }

    if (count == ended) {
      cb();
    }
  }

  function endWithError(err) {
    if (errored) { return; }

    errored = true;
    cb(err);
  }

  streams.forEach(s => s
    .on('end', shouldEnd)
    .on('error', endWithError)
  );
}

onEnd 函数可用于等待流数组结束,或者在为第一个发出的错误事件发出错误事件的情况下。

尝试使用 async 函数:

const { createReadStream } = require("fs")

async function main() {
  const stream1 = createReadStream(__dirname + "/1.txt")
  await pipe(stream1, process.stdout)

  const stream2 = createReadStream(__dirname + "/2.txt")
  await pipe(stream2, process.stdout)

  const stream3 = createReadStream(__dirname + "/3.txt")
  await pipe(stream3, process.stdout)
}

async function pipe(tap, sink) {
  return new Promise((resolve, reject) => {
    tap.pipe(sink, { end: false })
    tap.on("end", resolve)
    tap.on("error", reject)
  })
}

尝试下面的代码(sequenceStream 函数)。我担心的是错误处理,但它应该可以工作。 另外,如果你使用 Node < 10.* 那么你需要 end-of-stream 而不是 stream.finished

const stream  = require('stream');
const process = require('process')
const { promisify } = require('util')

const fromList = (lst) => new stream.Readable({
  read() {
    if (lst.length) {
      this.push(String(lst.shift()))
    } else {
      this.push(null)
    }
  }
})

const _finished = promisify(stream.finished)

const sequenceStream = (streams) => {
  const resultingStream = new stream.PassThrough()
  let isNext = Promise.resolve()
  for(const [i, curStream] of streams.entries()) {
    isNext = isNext.then(() => {
      curStream.pipe(resultingStream, {end: i === streams.length -1})
      return _finished(curStream)
    }).catch((err) => {
      resultingStream.write(err)
    })

  }
  return resultingStream
}

sequenceStream([
    fromList([1, 2, 3, 4]),
    fromList([5, 6, 7, 8]),
    fromList([9, 10])
]).pipe(process.stdout)