将子进程的标准输出用作另一个子进程的标准输入时,数据有时不会传递给第二个子进程

Data sometimes not passed to second child process when using the stdout of a child process as stdin of another

当使用一个子进程的标准输出作为另一个子进程的标准输入时,似乎有时数据没有传递给下一个子进程:

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
  });
};

var launch = function(){
  var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);

  pipeId++;

  task1.on('exit', launch);
};

launch();

部分文件为空:

ls -lhS /tmp/body-pipeline-*

我还尝试通过访问 task0.stdout._handle.fd 将文件描述符作为正整数传递,但问题仍然存在。

据我所知,shell 管道是这样工作的:一个进程的标准输出的相同文件描述符用作另一个进程的标准输入。我试图避免通过 NodeJS 进程传递所有数据,因为当子进程输出大量数据时,它会导致高 CPU 负载。

更新:当管道同时用于 stdin 和 stdout 时,一切都按预期工作(在这里使用 cat 来测试更长的文本):

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'pipe', 'pipe', 'pipe']
  });
};

var launch = function(){
  var task0 = launchProcess('cat');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId]);

  task0.stdout.pipe(task1.stdin)

  task0.stdin.write(JSON.stringify(process.env).split(',').join('\n'))
  task0.stdin.end();

  pipeId++;

  task1.on('exit', launch);
};

launch();

Update2:当使用 task0.stdout.pipe(task1.stdin) 时,脚本使用 50% CPU(与将 task0 的标准输出作为 task1 的标准输入传递时的 0% 相比):

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin, stdout, stderr){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, stderr]
  });
};

var launch = function(){
  var task0 = launchProcess('yes', ['lala'], 'ignore', 'pipe', 'ignore');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore', 'ignore');
  // var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore', 'ignore');


  task0.stdout.pipe(task1.stdin);

  pipeId++;

  task1.on('exit', launch);
};

launch();

Update3:这更能说明我的问题。我试图在原始代码中对其进行简化,但我认为它过于简化了。 Larry Turtis 为简化案例提供了一个解决方法,但不适用于我的案例:

var spawn = require('child_process').spawn;

var pipeId = 0;
var pipeSlots = 6;

var launchProcess = function(cmd, args, stdin, stdout){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, 'ignore']
  });
};

var launch = function(){
  var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', 'pipe');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore');

  task0.on('error', function(err){
    console.log('Error while processing task0:' + err.stack);
  });
  task1.on('error', function(err){
    console.log('Error while processing task1:' + err.stack);
  });

  pipeId++;
};

// Simulating message queue
setInterval(function(){
  // Simulating how many messages we get from the messaging queue
  var mqMessageCount = Math.floor(Math.random() * (pipeSlots + 1));

  for(var i = 0; i < mqMessageCount; i++){
    launch();
  }
}, 250); // For this test we assume that pipes finish under 250ms

如果您不等待第二个进程退出,您的原始代码可以正常工作。

var launch = function(){
  var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);

  pipeId++;

  launch();
};

可能发生的情况是任务 1 已完成,但任务 0 未完成。我不是 100% 清楚为什么这很重要,但它显然很重要。可能与节点 documentation 中我们注意到的事实有关:

..when the 'exit' event is triggered, child process stdio streams might still be open.

确保完成这两项任务即可解决问题。

var spawn = require('child_process').spawn;
var q = require("q");
var pipeId = 0;

var launchProcess = function(cmd, args, stdin) {
    return spawn(cmd, args, {
        stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
    });
};

var launch = function() {
    var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
    var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);

    var p0 = q.defer();
    var p1 = q.defer();
    task0.on('exit', p0.resolve);
    task1.on('exit',p1.resolve);

    q.all(p0, p1).then(launch)

    pipeId++;
};

launch();

现在这是一个已知的 NodeJS 问题:https://github.com/nodejs/node/issues/9413

TLDR;我的一位同事提出了解决此问题的好主意:

var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore');
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', task1.stdin);

想法是在启动发送任务之前启动接收任务!