Javascript 中的分叉任务工作流

Forking tasks workflow in Javascript

我正在做一些测试以学习在 JavaScript 中分叉不同的任务,因为我是该语言的新手。我正在尝试对格式如下的纯文本文件中的每三个数字组求和:

199
200
208
210
200
207

(199, 200, 208) 是第一组,(200, 208, 210) 是第二组,依此类推...

我从文件中读取,拆分字符串并得到我的字符串数组。现在我想在一个循环中添加,该循环分叉每次迭代(在子进程中进行求和)并打印结果数组。

parent.js

const fs = require('fs');
const { fork } = require('child_process');

const readString = fs.readFileSync('depth_readings_p2.txt', 'utf8');
const readArray = readString.split('\n');

var numArrayDef = [];

for (let i = 0; i < readArray.length - 2; i++) {
    let msg = {
        i,
        readArray
    };

    let childProcess = fork('function.js');

    childProcess.send(msg);

    childProcess.on('message', (m) => {
        console.log(m);
        numArrayDef.push(m);
    });

    console.log(numArrayDef[i]);
}

如您所见,我发送的子进程和对象包括索引、字符串数组和存储求和数的数组。父进程收到求和数并将其存储在 numArrayDef 中。

function.js

process.on('message', (msg) => {
    let num = 0;

    if ((msg.i + 2) < msg.readArray.length) {
        num += parseInt(msg.readArray[msg.i]);
        num += parseInt(msg.readArray[msg.i + 1]);
        num += parseInt(msg.readArray[msg.i + 2]);

        process.send(num);
    }

    process.exit();
});

在输出中我可以看到父级正在正确接收所有内容,但程序没有将接收到的值推送到结果数组中。另外,执行顺序很奇怪: - 首先,循环中的所有内容,但消息接收块除外。 - 其次,循环结束后的一切。 - 最后是消息接收块。

undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
[]
607
618
618
617
647
716
769
792

我知道我遗漏了一些关于分叉进程的信息,但我不知道它是什么,而且我在 fork documentation.

中也没有看到它

nodejs中你要明白的是它的asynchronious本质,代码并没有像你写的那样真正按顺序执行! (至少,很多次..)

childProcess是进程句柄,会立即返回。但是分叉进程本身可能需要一些时间才能启动。你要做的是添加一个 callback ,它会在每次收到 message 事件时执行。检查此代码:

parent.js

let childProcess = fork('function.js');

// this line is executed immedatly after the handle is created.
// You pass a newly created function to the ".on()" function which will be 
// called everytime, the child process sends a "message" event.
// you want to understand, that you just declare an anonymious `function`
// and pass it as argument. So the executed function has actually to decide
// when to call it!.
childProcess.on('message', (m) => {
    console.log('message received in parent:', m)
    console.log('closing the process')
    childProcess.kill('SIGINT')
});


childProcess.on('exit', () => {
    console.log('child is done!')
})

childProcess.send('I will come back!')

console.log('last line reached. Program still running.')

function.js

process.on('message', (msg) => {

    // wait a few seconds, and return the message!
    setTimeout(() => {
        process.send(msg)
        // wait 2000ms
    }, 2000)
}

输出

last line reached. Program still running.
message received in parent: I will come back!
closing the process
child is done!

执行顺序

  • 分叉一个进程并获取它的句柄。代码执行继续!
  • 注册 callback 监听器,这些监听器将在 messageexit 等给定事件上调用。这些实际上是asynchronious。你不知道他们什么时候开始。
  • 记录所有行已执行
  • 一段时间后,message 侦听器和随后的 exit 侦听器启动。

你的代码

您的代码基本上执行到最后(仅将处理程序添加到 process 句柄)并将记录来自 numArrayDef 当前未添加到其中的数据。因此,如果 numArrayDef[5] 处没有元素,它将默认记录 undefined

回调

因为 nodejs 默认是单线程的,通常执行一个 asynchronious 函数并传递给它一个 callback (只是另一个函数),它将在你被调用的函数时执行完成了!

固定码

parent.js

const fs = require('fs');
const { fork } = require('child_process');
const { EOL } = require('os')

const readString = fs.readFileSync('file.txt', 'utf8');
const readArray = readString.split(EOL);

var numArrayDef = [];

for (let i = 0; i < readArray.length - 2; i++) {

    // msg building. Done instantly.
    let msg = {
        i,
        readArray
    };


    // forking a childprocess. The handle is retunred immediatly
    // but starting the process may be taking some time, and 
    // the code won't wait for it!.
    let childProcess = fork('function.js');

    // this line is executed immedatly after the handle is created.
    // You add a so 
    childProcess.on('message', (m) => {
        console.log('message recevied', m)
        numArrayDef.push(m);

        // log if all numbers are done.
        if(numArrayDef.length === readArray.length -2) {
            console.log('Done. Here\'s the array:', numArrayDef)
        }

    });

    childProcess.send(msg);
}

function.js

process.on('message', (msg) => {
    let num = 0;

    if ((msg.i + 2) < msg.readArray.length) {
        num += parseInt(msg.readArray[msg.i]);
        num += parseInt(msg.readArray[msg.i + 1]);
        num += parseInt(msg.readArray[msg.i + 2]);

        process.send(num);
    }

    process.exit();
});

这应该会给你一个想法。我建议,一开始就去学习一些教程,以了解语言的本质。

你应该了解什么 nodejs

  • 了解什么是 callback
  • 必须对 async/awaitPromises 有基本的了解
  • 你应该学习一下,什么操作是sync,哪些操作是async
  • Eventemitter class也经常用
  • 学习如何处理 childprocessfork 和其他类似的东西,并不是真正需要获得对 nodejs
  • 的基本理解

函数声明

只是语法的一个插件。这些几乎完全一样。除了使用 arrow 函数样式 this 上下文将正确应用于新创建的 function:

// variant1
function abc(fn) {
    // eexcute the argument which is a function. But only after
    // a timeout!. 
    setTimeout(fn, 2000)
} 

// variant2
const abc = function(fn) {
    // eexcute the argument which is a function. But only after
    // a timeout!. 
    setTimeout(fn, 2000)
}

// variant3
const abc = (fn) => {
    // eexcute the argument which is a function. But only after
    // a timeout!. 
    setTimeout(fn, 2000)
}

// call it like so:
abc(function() { 
   console.log('I was passed!!.')
})

console.log('The abc function was called. Let\'s wait for it to call the passed function!\')