在异步函数中生成 NodeJS 工作线程不遵守承诺解决方案

Spawning NodeJS worker thread inside of async function doesn't respect promise resolution

我有一个启动 NodeJS 工作线程的异步函数,如下所示:

    encode : async (config) => {

      if (isMainThread) {

            const encode_worker = new Worker(`./service-encode.js`, { workerData: config });

            encode_worker.on('message', (transcode_data) => { 
              log.info("%o", transcode_data);
              return transcode_data;
            });

            encode_worker.on('error', (err) => { log.error(err)});
            encode_worker.on('exit', (code) => {
              if (code !== 0)
                throw new Error(`Encoding stopped with exit code [ ${code} ]`);
                console.log("* * * EXITED ENCODER WORKER * * *")

            });

      }

    },

serivce-encode.js 文件中,我得到了以下使用异步函数的代码。请注意,我使用 postMessage 表示已完成。

var transcoder        = require('./transcoder');

const {Worker, isMainThread, parentPort, workerData} = require('worker_threads');

console.log("* * * STARTING ENCODE THREAD * * *\n");
console.log(workerData);

transcoder.run(workerData)
          .then((results) => {
              transcode_data = results;
              parentPort.postMessage(transcode_data);
          })
          .catch(err => { throw err });

然后,我使用以下示例代码,但上面 'message' 事件中的代码会立即触发。也就是好像没有等到完成:

encode(conf).then((encode_data) => { console.log("Encode Data :", encode_data); 

encode 函数工作正常,但 console.log 语句在调用 encode() 函数时立即执行 — encode_data var 是 undefined。既然encode中的return语句是在message事件中,那个时候不应该解决async函数的promise吗?

所以,您的 async 函数中的代码不支持 promises。您不能只是在 async 函数中随机抛出异步(但不是基于承诺的)代码并期望任何东西都能正常工作。 async 函数可以与您 await 基于 promise 的异步函数一起正常工作。否则,它对那里的异步操作一无所知。这就是为什么不等待任何事情完成就立即调用 encode() returns 的原因。

此外,return transcode_data 在异步回调中。返回回调内部只是回到调用回调的系统代码,并被尽职地忽略。您没有从 async 函数本身返回任何东西。您正在返回那个回调。

由于您的操作不是基于 promise 的,要解决这个问题,您必须通过将其包装在 promise 中使其基于 promise,然后在需要时使用适当的值手动解决或拒绝该 promise。你可以这样做:

encode: (config) => {

    if (isMainThread) {

        return new Promise((resolve, reject) => {
            const encode_worker = new Worker(`./service-encode.js`, { workerData: config });

            encode_worker.on('message', (transcode_data) => {
                log.info("%o", transcode_data);
                resolve(transcode_data);
            });

            encode_worker.on('error', (err) => {
                log.error(err)
                reject(err);
            });

            encode_worker.on('exit', (code) => {
                if (code !== 0) {
                    reject(new Error(`Encoding stopped with exit code [ ${code} ]`));
                }
                console.log("* * * EXITED ENCODER WORKER * * *")
            });

        });


    } else {
        // should return a promise from all calling paths
        return Promise.reject(new Error("Can only call encode() from main thread"));
    }

},

仅供参考,此代码假定您在此处从承诺中寻找的“结果”是您从该工作人员处获得的第一个 message transcode_data