javascript 的工人没那么快

Workers in javascript not so fast

我正在尝试 js 中的工作人员,我尝试使用相同的 js 排序函数进行简单排序。我所做的比较只是使用一个异步函数,它将对 60000 个随机数进行排序。第一个将按照传统方式对随机数进行排序。

async function normalSort(arr) {
    return new Promise((res) => {
        let copy = arr;
        copy.sort((a, b) => a > b ? 1 : -1);
        return res(copy)
    })
}

另一个是普通函数,将被 workersHandler 函数调用

const { Worker, parentPort, workerData } = require('worker_threads');

function sort(data) {
    let copy = data;
    copy.sort((a, b) => a > b ? 1 : -1);
    parentPort.postMessage(copy)
    process.exit();
}


sort(workerData); 

工人处理函数

const os = require('os');
const path = require('path');
const { Worker } = require('worker_threads');

async function workersHandler(arr) {
    const startTime = Date.now();
    const cpusAmount = os.cpus().length;
    const chSize = Math.ceil(arr.length / cpusAmount)
    let promises = [];
    for (let i = 0; i < arr.length; i += chSize) {
        const end = i + chSize;
        const currentChunk = arr.slice(i, end);
        const promise = new Promise((res, rej) => {
            //@ts-ignore
            const worker = new Worker(path.join(__dirname, '..', '/utils/sort.js'), { workerData: currentChunk })

            worker.on('message', res)
            worker.on('error', rej)
        })
        promises.push(promise);
    }
    let result = await Promise.all(promises)
    return result;
}

和将调用其他函数的主函数

function main() {
    let arr = new Array(60000).fill(0).map((_, i) => Math.round(Math.random() * 100));
    const startTime = Date.now();

    workersHandler(arr).then(r => console.log('workers sort', Date.now() - startTime + ' ms'))
    normalSort(arr).then(r => console.log('normal sort', Date.now() - startTime + ' ms'))
}
main();

令人惊讶的是,正常的排序函数速度更快,并且在一个线程中工作。 我正在接收工人功能 101 毫秒 对于正常排序功能 53 毫秒 有人可以解释为什么会出现这些奇怪的结果吗?是工人没那么快还是我执行错了?

基本上,使用单个工作线程并等待它完成工作总是比在本地线程中完成工作慢,因为:

  • 创建线程需要时间。
  • 在线程之间发送数据需要时间。

如果您有可以并行处理的独立工作,并且可以使用多个 CPU 核心,那么您可能会有所收获。在这种情况下,您可以将不同的工作分配给多个工作人员(最多 CPU 个可用核心),前提是工作不受他们都在竞争的其他单一资源的限制。

下面我发布了一个程序,该程序通过重复比赛的工人在本地对 12 个数组进行排序。 (当在worker中排序时,它将数组数据传输到worker然后再返回而不是复制它。)它提前启动worker并重用它们,但它包括时间在确定工作人员完成工作的平均时间时所花费的时间,因此我们包括了所有间接费用。

在我的工作站上,有四个 CPU 核心并让它为每个核心配备一个工作人员,工作人员很容易获胜:

# of workers:     4
Local average:    8790.010573029518ms
Workers' average: 3550.658817946911ms
Workers win, taking 40.39425% of the time local did

但是,如果我将它限制为一个工作人员,工作人员是纯粹的开销,本地线程获胜:

# of workers:     1
Local average:    8907.022233068943ms
Workers' average: 8953.339844942093ms
Local wins, taking 99.48268% of the time workers did

即使只有两个工人也能获胜,因为他们可以在这台多核机器上并行工作:

# of workers:     2
Local average:    8782.853852927685ms
Workers' average: 4754.60275799036ms
Workers win, taking 54.13505% of the time local did

在单核机器上(如果你能再找到一台),那两个 worker 将再次成为纯粹的开销,本地线程将获胜。

这里是main.js

const os = require('os');
const { Worker } = require('worker_threads');
const { performance } = require('perf_hooks');

const MAX_UINT32 = (2**32)-1;
const ARRAY_SIZE = 100000;
const ARRAY_COUNT = 12;
const workerCount = +process.argv[2] || os.cpus().length;
const raceCount = +process.argv[3] || 5;

class WorkerQueue {
    #workers;
    #available;
    #pending;
    #checkPending = () => { // private methods still aren't unflagged in v13, so...
        if (this.#available.length && this.#pending.length) {
            const resolve = this.#pending.shift();
            const worker = this.#available.shift();
            resolve(worker);
        }
    };

    constructor(...workers) {
        this.#workers = new Set(workers);
        this.#available = [...this.#workers];
        this.#pending = [];
    }

    get() {
        return new Promise(resolve => {
            this.#pending.push(resolve);
            this.#checkPending();
        });
    }

    release(worker) {
        if (!this.#workers.has(worker)) {
            throw new Error("Uknown worker");
        }
        this.#available.push(worker);
        this.#checkPending();
    }

    terminate() {
        for (const worker of this.#workers) {
            worker.terminate();
        }
        this.#workers = new Set();
        this.#available = [];
        this.#pending = [];
    }
}

const {workers, workerCreationTime} = createWorkers();

main();

function createWorkers() {
    const start = performance.now();
    const workers = new WorkerQueue(
        ...Array.from({length: workerCount}, () => new Worker("./worker.js"))
    );
    const workerCreationTime = performance.now() - start;
    return {workers, workerCreationTime};
}

async function main() {
    try {
        console.log(`Workers: ${workerCount} (in ${workerCreationTime}ms), races: ${raceCount}`);
        let localAverage = 0;
        let workersAverage = 0;
        for (let n = 1; n <= raceCount; ++n) {
            console.log(`Race #${n}:`);
            const {localTime, workersTime} = await sortRace();
            localAverage += localTime;
            workersAverage += workersTime;
        }
        // Include the time it took to create the workers in the workers' average, as
        // though we'd created them for each race. (We didn't because doing so would
        // have given the local thread an advantage: after the first race, it's warmed
        // up, but a new worker would be cold. So we let the workers be warm but add
        // the full creation time into each race.
        workersAverage += workerCreationTime;
        console.log("----");
        console.log(`# of workers:     ${workerCount}`);
        console.log(`Local average:    ${localAverage}ms`);
        console.log(`Workers' average: ${workersAverage}ms`);
        if (localAverage > workersAverage) {
            showWinner("Workers win", "local", workersAverage, localAverage);
        } else {
            showWinner("Local wins", "workers", localAverage, workersAverage);
        }
        workers.terminate();
    } catch (e) {
        console.error(e.message, e.stack);
    }
}

function showWinner(msg, loser, winnerAverage, loserAverage) {
    const percentage = (winnerAverage * 100) / loserAverage;
    console.log(`${msg}, taking ${percentage.toFixed(5)}% of the time ${loser} did`);
}

async function sortRace() {
    // Create a bunch of arrays for local to sort
    const localArrays = Array.from({length: ARRAY_COUNT}, () => createRandomArray(ARRAY_SIZE));
    // Copy those array so the workers are dealing with the same values
    const workerArrays = localArrays.map(array => new Uint32Array(array));

    const localStart = performance.now();
    const localResults = await Promise.all(localArrays.map(sortLocal));
    const localTime = performance.now() - localStart;
    checkResults(localResults);
    console.log(`Local time:    ${localTime}ms`);

    const workerStart = performance.now();
    const workersResults = await Promise.all(workerArrays.map(sortViaWorker));
    const workersTime = performance.now() - workerStart;
    checkResults(workersResults);
    console.log(`Workers' time: ${workersTime}ms`);

    return {localTime, workersTime};
}

async function sortLocal(array) {
    await Promise.resolve(); // To make it start asynchronously, like `sortViaWorker` does
    array.sort((a, b) => a - b);
    return array;
}

async function sortViaWorker(array) {
    const worker = await workers.get();
    return new Promise(resolve => {
        worker.once("message", result => {
            workers.release(worker);
            resolve(result.array);
        });
        worker.postMessage({array}, [array.buffer]);
    });
}

function checkResults(arrays) {
    for (const array of arrays) {
        const badIndex = array.findIndex((value, index) => index > 0 && array[index-1] > value);
        if (badIndex !== -1) {
            throw new Error(
                `Error, array entry ${badIndex} has value ${array[badIndex]} ` +
                `which is > previous value ${array[badIndex-1]}`
            );
        }
    }
}

function createRandomArray(length) {
    const array = new Uint32Array(Uint32Array.BYTES_PER_ELEMENT * length);
    return randomFillArray(array);
}

function randomFillArray(array) {
    for (let length = array.length, i = 0; i < length; ++i) {
        array[i] = Math.random() * MAX_UINT32;
    }
    return array;
}

worker.js:

const { parentPort } = require("worker_threads");

parentPort.on("message", ({array}) => {
    array.sort((a, b) => a - b);
    parentPort.postMessage({array}, [array.buffer]);
});

https://nodejs.org/api/worker_threads.html#worker_threads_port_postmessage_value_transferlist and https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage:

postMessage(value[, transferList])
node: transferList may be a list of ArrayBuffer and MessagePort objects. After transferring, they will not be usable on the sending side of the channel anymore (even if they are not contained in value). MDN: An optional array of Transferable objects to transfer ownership of. If the ownership of an object is transferred, it becomes unusable (neutered) in the context it was sent from and becomes available only to the worker it was sent to. Transferable objects are instances of classes like ArrayBuffer, MessagePort or ImageBitmap objects that can be transferred.

类型的影响:

let typ=prompt("Type: 0/1/2/3 (Array/Float64Array/Float32Array/Uint32Array)");
let len=parseInt(prompt("Length"));
let basearray;
switch(typ){
  case "1":basearray=new Float64Array(len);break;
  case "2":basearray=new Float32Array(len);break;
  case "3":basearray=new Uint32Array(len);break;
  default: basearray=new Array(len);break;
}
for(let i=0;i<basearray.length;i++)
  basearray[i]=Math.random()*0x1000000;

let cpus=4,
    chunksize=basearray.length/cpus,
    chunks=[],chunksw=[];
for(let i=0;i<cpus;i++)
  chunksw[i]=(chunks[i]=basearray.slice(i*chunksize,(i+1)*chunksize)).slice();

let start=Date.now();
for(let i=0;i<cpus;i++)
  chunks[i].sort((a,b)=>a-b);
console.log("Seq:",Date.now()-start);

let code="onmessage=event=>postMessage(event.data.sort((a,b)=>a-b));";
let ws=[],cnt=0;
for(let i=0;i<cpus;i++){
  ws[i]=new Worker("data:text/plain,"+escape(code));
  let j=i;
  ws[i].onmessage=event=>{
    chunksw[j]=event.data;
    if(++cnt===cpus){
      console.log("Par:",Date.now()-start);
      if(len<=20)
        for(let i=0;i<cpus;i++)
          console.log(chunks[i],chunksw[i]);
    }
  };
}
start=Date.now();
for(let i=0;i<cpus;i++)
  ws[i].postMessage(chunksw[i]);

指定一个可被 4 整除的长度。如果长度为 20 或更小,生成的排序块也将被记录以用于验证目的。 JS Array-s 在传递时对我来说确实慢(与无线程 运行 相比),无论包含 20 还是 6000000 个元素(而 600 万元素的 JS 数组 运行s 对我来说在旧笔记本电脑上持续 8 秒,从更少的东西开始可能仍然更安全)。其他类型在线程化时速度更快,Uint 是最快的。
实际上,任何不是 1/2/3 的东西都会导致 JS Array(最慢的一个),包括空字符串。

传输的效果不是那么惊人,但从一开始就已经出现了(4 个元素是 59-69 毫秒,而我的电脑是 20-22 毫秒):

let typ=prompt("Type: 0/1/2 (Float64Array/Float32Array/Uint32Array)");
let len=parseInt(prompt("Length"));
let basearray;
switch(typ){
  case "1":basearray=new Float32Array(len);break;
  case "2":basearray=new Uint32Array(len);break;
  default:basearray=new Float64Array(len);
}
for(let i=0;i<basearray.length;i++)
  basearray[i]=Math.random()*0x1000000;

let cpus=4,
    chunksize=basearray.length/cpus,
    chunksw=[],chunkswt=[];
for(let i=0;i<cpus;i++)
  chunkswt[i]=(chunksw[i]=basearray.slice(i*chunksize,(i+1)*chunksize)).slice();

let start;
let code="onmessage=event=>postMessage(event.data.sort((a,b)=>a-b));";
let ws=[],cnt=0;
for(let i=0;i<cpus;i++){
  ws[i]=new Worker("data:text/plain,"+escape(code));
  let j=i;
  ws[i].onmessage=event=>{
    chunksw[j]=event.data;
    if(++cnt===cpus){
      console.log("Non-transfer:",Date.now()-start);
      // launch transfer measurement
      cnt=0;start=Date.now();
      for(let i=0;i<cpus;i++)
        wst[i].postMessage(chunkswt[i].buffer,[chunkswt[i].buffer]);    }
  };
}

let codet;
switch(typ){
  case "1":
    codet="onmessage=event=>{"+
          "let arr=new Float32Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
    break;
  case "2":
    codet="onmessage=event=>{"+
          "let arr=new Uint32Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
    break;
  default:
    codet="onmessage=event=>{"+
          "let arr=new Float64Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
}
let wst=[];
for(let i=0;i<cpus;i++){
  wst[i]=new Worker("data:text/plain,"+escape(codet));
  let j=i;
  wst[i].onmessage=event=>{
    switch(typ){
      case "1":chunkswt[j]=new Float32Array(event.data);break;
      case "2":chunkswt[j]=new Uint32Array(event.data);break;
      default:chunkswt[j]=new Float64Array(event.data);
    }
    if(++cnt===cpus){
      console.log("Transfer:",Date.now()-start);
      if(len<=20)
        for(let i=0;i<cpus;i++)
          console.log(chunksw[i],chunkswt[i]);
    }
  };
}

// launch non-transfer measurement
start=Date.now();
for(let i=0;i<cpus;i++)
  ws[i].postMessage(chunksw[i]);

这段代码有点乱,因为它是可以传输的缓冲区,而不是类型化数组本身,而且,当第二次测量被初始化为直接复制粘贴时(这已经不是那么漂亮了),然后从第一个的完成函数中启动。

(我不想提供准确的测量结果,因为我的 PC 也在做一些其他事情。只是 运行 使用不同甚至重复参数的片段几次)