将子进程的标准输出用作另一个子进程的标准输入时,数据有时不会传递给第二个子进程
Data sometimes not passed to second child process when using the stdout of a child process as stdin of another
当使用一个子进程的标准输出作为另一个子进程的标准输入时,似乎有时数据没有传递给下一个子进程:
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
});
};
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
pipeId++;
task1.on('exit', launch);
};
launch();
部分文件为空:
ls -lhS /tmp/body-pipeline-*
我还尝试通过访问 task0.stdout._handle.fd
将文件描述符作为正整数传递,但问题仍然存在。
据我所知,shell 管道是这样工作的:一个进程的标准输出的相同文件描述符用作另一个进程的标准输入。我试图避免通过 NodeJS 进程传递所有数据,因为当子进程输出大量数据时,它会导致高 CPU 负载。
更新:当管道同时用于 stdin 和 stdout 时,一切都按预期工作(在这里使用 cat 来测试更长的文本):
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'pipe', 'pipe', 'pipe']
});
};
var launch = function(){
var task0 = launchProcess('cat');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId]);
task0.stdout.pipe(task1.stdin)
task0.stdin.write(JSON.stringify(process.env).split(',').join('\n'))
task0.stdin.end();
pipeId++;
task1.on('exit', launch);
};
launch();
Update2:当使用 task0.stdout.pipe(task1.stdin)
时,脚本使用 50% CPU(与将 task0 的标准输出作为 task1 的标准输入传递时的 0% 相比):
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin, stdout, stderr){
return spawn(cmd, args, {
stdio: [stdin, stdout, stderr]
});
};
var launch = function(){
var task0 = launchProcess('yes', ['lala'], 'ignore', 'pipe', 'ignore');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore', 'ignore');
// var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore', 'ignore');
task0.stdout.pipe(task1.stdin);
pipeId++;
task1.on('exit', launch);
};
launch();
Update3:这更能说明我的问题。我试图在原始代码中对其进行简化,但我认为它过于简化了。 Larry Turtis 为简化案例提供了一个解决方法,但不适用于我的案例:
var spawn = require('child_process').spawn;
var pipeId = 0;
var pipeSlots = 6;
var launchProcess = function(cmd, args, stdin, stdout){
return spawn(cmd, args, {
stdio: [stdin, stdout, 'ignore']
});
};
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', 'pipe');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore');
task0.on('error', function(err){
console.log('Error while processing task0:' + err.stack);
});
task1.on('error', function(err){
console.log('Error while processing task1:' + err.stack);
});
pipeId++;
};
// Simulating message queue
setInterval(function(){
// Simulating how many messages we get from the messaging queue
var mqMessageCount = Math.floor(Math.random() * (pipeSlots + 1));
for(var i = 0; i < mqMessageCount; i++){
launch();
}
}, 250); // For this test we assume that pipes finish under 250ms
如果您不等待第二个进程退出,您的原始代码可以正常工作。
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
pipeId++;
launch();
};
可能发生的情况是任务 1 已完成,但任务 0 未完成。我不是 100% 清楚为什么这很重要,但它显然很重要。可能与节点 documentation 中我们注意到的事实有关:
..when the 'exit' event is triggered, child process stdio streams might still be open.
确保完成这两项任务即可解决问题。
var spawn = require('child_process').spawn;
var q = require("q");
var pipeId = 0;
var launchProcess = function(cmd, args, stdin) {
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
});
};
var launch = function() {
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
var p0 = q.defer();
var p1 = q.defer();
task0.on('exit', p0.resolve);
task1.on('exit',p1.resolve);
q.all(p0, p1).then(launch)
pipeId++;
};
launch();
现在这是一个已知的 NodeJS 问题:https://github.com/nodejs/node/issues/9413
TLDR;我的一位同事提出了解决此问题的好主意:
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore');
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', task1.stdin);
想法是在启动发送任务之前启动接收任务!
当使用一个子进程的标准输出作为另一个子进程的标准输入时,似乎有时数据没有传递给下一个子进程:
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
});
};
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
pipeId++;
task1.on('exit', launch);
};
launch();
部分文件为空:
ls -lhS /tmp/body-pipeline-*
我还尝试通过访问 task0.stdout._handle.fd
将文件描述符作为正整数传递,但问题仍然存在。
据我所知,shell 管道是这样工作的:一个进程的标准输出的相同文件描述符用作另一个进程的标准输入。我试图避免通过 NodeJS 进程传递所有数据,因为当子进程输出大量数据时,它会导致高 CPU 负载。
更新:当管道同时用于 stdin 和 stdout 时,一切都按预期工作(在这里使用 cat 来测试更长的文本):
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'pipe', 'pipe', 'pipe']
});
};
var launch = function(){
var task0 = launchProcess('cat');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId]);
task0.stdout.pipe(task1.stdin)
task0.stdin.write(JSON.stringify(process.env).split(',').join('\n'))
task0.stdin.end();
pipeId++;
task1.on('exit', launch);
};
launch();
Update2:当使用 task0.stdout.pipe(task1.stdin)
时,脚本使用 50% CPU(与将 task0 的标准输出作为 task1 的标准输入传递时的 0% 相比):
var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin, stdout, stderr){
return spawn(cmd, args, {
stdio: [stdin, stdout, stderr]
});
};
var launch = function(){
var task0 = launchProcess('yes', ['lala'], 'ignore', 'pipe', 'ignore');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore', 'ignore');
// var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore', 'ignore');
task0.stdout.pipe(task1.stdin);
pipeId++;
task1.on('exit', launch);
};
launch();
Update3:这更能说明我的问题。我试图在原始代码中对其进行简化,但我认为它过于简化了。 Larry Turtis 为简化案例提供了一个解决方法,但不适用于我的案例:
var spawn = require('child_process').spawn;
var pipeId = 0;
var pipeSlots = 6;
var launchProcess = function(cmd, args, stdin, stdout){
return spawn(cmd, args, {
stdio: [stdin, stdout, 'ignore']
});
};
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', 'pipe');
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore');
task0.on('error', function(err){
console.log('Error while processing task0:' + err.stack);
});
task1.on('error', function(err){
console.log('Error while processing task1:' + err.stack);
});
pipeId++;
};
// Simulating message queue
setInterval(function(){
// Simulating how many messages we get from the messaging queue
var mqMessageCount = Math.floor(Math.random() * (pipeSlots + 1));
for(var i = 0; i < mqMessageCount; i++){
launch();
}
}, 250); // For this test we assume that pipes finish under 250ms
如果您不等待第二个进程退出,您的原始代码可以正常工作。
var launch = function(){
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
pipeId++;
launch();
};
可能发生的情况是任务 1 已完成,但任务 0 未完成。我不是 100% 清楚为什么这很重要,但它显然很重要。可能与节点 documentation 中我们注意到的事实有关:
..when the 'exit' event is triggered, child process stdio streams might still be open.
确保完成这两项任务即可解决问题。
var spawn = require('child_process').spawn;
var q = require("q");
var pipeId = 0;
var launchProcess = function(cmd, args, stdin) {
return spawn(cmd, args, {
stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
});
};
var launch = function() {
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
var p0 = q.defer();
var p1 = q.defer();
task0.on('exit', p0.resolve);
task1.on('exit',p1.resolve);
q.all(p0, p1).then(launch)
pipeId++;
};
launch();
现在这是一个已知的 NodeJS 问题:https://github.com/nodejs/node/issues/9413
TLDR;我的一位同事提出了解决此问题的好主意:
var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore');
var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', task1.stdin);
想法是在启动发送任务之前启动接收任务!