在 nodejs 中将数据从 child 管道传输到 parent

Piping data from child to parent in nodejs

我有一个 nodejs parent 进程启动另一个 nodejs child 进程。 child 进程执行一些逻辑,然后 returns 输出到 parent。输出很大,我正在尝试使用管道进行通信,正如 child.send() 方法的文档中所建议的那样(顺便说一句,效果很好)。

我希望有人建议如何正确建立这个沟通渠道。我希望能够将数据从 parent 发送到 child,并且能够将数据从 child 发送到 parent。我开始了一点,但它不完整(仅从 parent 向 child 发送消息)并抛出错误。

Parent 文件代码:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Child 文件代码:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

var chunks = []; 

readable.on('data', function(chunk) {
    chunks.push(chunk);
});

readable.on('end', function() {
    console.log(chunks.join().toString());
})

以上代码打印预期输出 ("test 2") 以及以下错误:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: shutdown ENOTCONN
    at exports._errnoException (util.js:746:11)
    at Socket.onSocketFinish (net.js:232:26)
    at Socket.emit (events.js:129:20)
    at finishMaybe (_stream_writable.js:484:14)
    at afterWrite (_stream_writable.js:362:3)
    at _stream_writable.js:349:9
    at process._tickCallback (node.js:355:11)
    at Function.Module.runMain (module.js:503:11)
    at startup (node.js:129:16)
    at node.js:814:3

完整答案:

Parent的代码:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe', 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

child.stdio[3].write('First message.\n', 'utf8', function() {
    child.stdio[3].write('Second message.\n', 'utf8', function() {

    });
}); 

child.stdio[4].pipe(process.stdout);

Child的代码:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

readable.pipe(process.stdout);
fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

您的代码有效,但是通过使用 streamifier 包从字符串创建读取流,您的通信通道会在该字符串传输后自动关闭,这就是您收到 ENOTCONN 错误的原因。

为了能够通过流发送多条消息,请考虑对其使用 .write。您可以随时调用它:

child.stdio[3].write('First message.\n');
child.stdio[3].write('Second message.\n');

如果您想使用此方法发送多个离散消息(根据您之前使用 child.send() 的评论,我认为是这种情况),最好使用一些分隔符来能够在 child 中读取流时拆分消息。在上面的示例中,我为此使用了换行符。 event-stream.

有助于进行这种拆分的一个有用的包

现在,为了从 parent 中的 child 创建另一个通信通道,只需将另一个 'pipe' 添加到您的 stdio。

您可以在 child 中写入:

fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

并在 parent 中阅读它:

child.stdio[4].pipe(process.stdout);

这会将 'Sending a message back.' 打印到控制台。

您可以使用 fork()

我刚刚自己解决了这个问题...fork()是spawn的高级版本,一般建议使用fork()而不是spawn()

如果您使用 {silent:true} 选项,stdio 将通过管道传输到父进程

          const cp = require('child_process');

          const n = cp.fork(<path>, args, {
              cwd: path.resolve(__dirname),
              detached: true,
           });

          n.stdout.setEncoding('utf8');

          // here we can listen to the stream of data coming from the child process:
          n.stdout.on('data', (data) => {
            ee.emit('data',data);
          });

          //you can also listen to other events emitted by the child process
          n.on('error', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });

          n.on('message', function (msg) {
            ee.emit('message', msg);
          });

          n.on('uncaughtException', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });


          n.once('exit', function (err) {
             console.error(err.stack);
             ee.emit('exit', err);
          });

我 运行 遇到了同样的问题,并使用了 {end:false} 选项来修复错误。不幸的是,接受的答案仅在处理少量数据的离散写入时有效。如果您有大量数据(而不仅仅是短消息),您需要处理流量控制并且使用 .write() 并不是最好的。对于这种情况(大数据传输),最好使用代码中最初的 .pipe() 函数来处理流控制。

抛出错误是因为您的父进程中的可读流正试图结束并关闭您的子进程的可写流输入管道。您应该在父进程管道中使用 {end: false} 选项:

原代码: require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

建议修改: require('streamifier').createReadStream('test 2').pipe(child.stdio[3], {end:false});

在此处查看 NodeJs 文档中的详细信息:https://nodejs.org/dist/latest-v5.x/docs/api/stream.html#stream_readable_pipe_destination_options

希望这对面临此问题的其他人有所帮助。