如何将 napi_threadsafe_function 用于 NodeJS Native Addon

How to use napi_threadsafe_function for NodeJS Native Addon

我一直在浏览 NAPI documentation 以尝试了解它如何处理 multithreading。根据文档,napi_create_threadsafe_function()napi_call_threadsafe_function() 用于从多个线程创建和调用 js functions。问题是文档不是那么简单,没有示例,我在其他任何地方都找不到。

如果有人有任何使用 napi_create_threadsafe_function()napi_call_threadsafe_function() 的经验或知道在哪里可以找到它们的使用示例。请提供一个基本示例,以便我了解如何正确使用它们。

我正在编写一个 C 插件而不是 C++ 并且需要使用这些功能。我没有使用包装器 node-addon-api,而是直接 napi

如果其他人遇到这个问题。我终于找到了一个例子 here.

一旦我更好地理解它并获得工作示例,我将在此处更新。希望以后有需要的人会比我更轻松。

查看 Satyan 的回答

作为一个总结性的标签,我们可以说,N-API ThreadSafeFunctions 充当在工作线程上执行的异步 C/C++ 代码和JavaScript 信息交换层.

在进入技术之前,让我们考虑一个场景,我们有一个很长的 运行 流程繁重的任务要完成。我们都知道将此任务放在 node.js 主线程上不是一个好的选择,它会阻塞事件循环并阻塞队列中的所有其他任务。所以一个好的选择可能是在一个单独的线程中考虑这个任务(让我们称这个线程为工作线程)。 JavaScript 异步回调和Promise 正是采用这种方法。

假设我们已经在工作线程 上部署了 任务,我们已经准备好一部分结果,我们希望将其发送到 JavaScript层。然后涉及的过程是,将结果转换为napi_value,然后从C/C++调用回调函数JavaScript。不幸的是,这两个操作都不能从工作线程执行;这些操作应该专门从主线程完成。 JavaScript Promise 和 Callback,等到任务完成,然后切换到主线程以及任务结果,生成正常的 C/C++ 存储设施,例如结构等。然后执行 napi_value转换并从主线程调用JavaScript回调函数。

由于我们的任务非常长 运行 可能我们不想等到任务结束才与 JavaScript 层交换结果。 让我们考虑这样一个场景,我们在一个非常大的视频中搜索对象,我们更愿意将检测到的对象发送到 JavaScript 层,就像找到它时一样。 在这种情况下我们将不得不在任务仍在进行时开始发送任务结果这就是异步线程安全函数调用为我们提供帮助的场景。它充当工作线程和 JavaScript 层之间用于信息交换的安全隧道。让我们考虑以下函数片段

napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
    // The native addon function exposed to JavaScript
    // This will be the funciton a node.js application calling.
}

void ExecuteWork(napi_env env, void* data)
{
    // We will use this function to get the task done.
    // This code will be executed on a worker thread.
}

void OnWorkComplete(napi_env env, napi_status status, void* data)
{
    // after the `ExecuteWork` function exits, this
    // callback function will be called on the main thread
}

void ThreadSafeCFunction4CallingJS(napi_env env, napi_value js_cb,
                 void* context, void* data)
{
   // This funcion acts as a safe tunnel between the asynchronous C/C++ code 
   // executing the worker thread and the JavaScript layer for information exchange.
}

这前三个函数和我们熟悉的JavaScriptPromise和Callback几乎一样。第四个专门用于异步线程安全函数调用。在此,我们的长 运行 任务由工作线程 上的 ExecuteWork() 函数执行。让我们说 它指示我们不要从 ExecuteWork() 调用 JavaScript(以及任何 napi_value 结果转换)但允许从 ThreadSafeCFunction4CallingJS 这样做,只要我们使用相当于 C/C++ 函数指针的 napi 调用 ThreadSafeCFunction4CallingJS。然后我们可以将 JavaScript 调用打包到这个 ThreadSafeCFunction4CallingJS() 函数中。然后,当 ExecuteWork() 函数可以将结果传递给 ThreadSafeCFunction4CallingJS() 时,它在普通 C/C++ 存储单元(如结构等)中被调用。ThreadSafeCFunction4CallingJS() 将此结果转换为 napi_value 和调用 JavaScript 函数。 在幕后,ThreadSafeCFunction4CallingJS() 函数正在排队到事件循环,最终由主线程执行。

下面封装在 CAsyncStreamSearch() 中的代码片段负责通过使用 napi_create_threadsafe_function( ) 并且它是从本机插件的主线程本身完成的。同样,通过使用 napi_create_async_work() 函数创建工作线程的请求,然后通过 使用 napi_queue_async_work() 将工作放入事件队列 以便工作线程在将来拾取此项目。

napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
-- -- -- --
-- -- -- --
  // Create a thread-safe N-API callback function correspond to the C/C++ callback function
  napi_create_threadsafe_function(env,
      js_cb, NULL, work_name, 0, 1, NULL, NULL, NULL,
      ThreadSafeCFunction4CallingJS, // the C/C++ callback function
      // out: the asynchronous thread-safe JavaScript function
      &(async_stream_data_ex->tsfn_StreamSearch));

  // Create an async work item, that can be deployed in the node.js event queue
  napi_create_async_work( env, NULL,
       work_name,
       ExecuteWork,
       OnWorkComplete,
       async_stream_data_ex,
       // OUT: THE handle to the async work item
       &(async_stream_data_ex->work_StreamSearch);)

  // Queue the work item for execution.
  napi_queue_async_work(env, async_stream_data_ex->work_StreamSearch);

  return NULL;
}

然后在异步执行任务期间(ExecuteWork() 函数)通过​​调用 napi_call_threadsafe_function() 函数调用 ThreadSafeCFunction4CallingJS(),如下所示。

static void ExecuteWork(napi_env env, void *data)
{
  // tsfn is napi equivalent of point to ThreadSafeCFunction4CallingJS
  // function that we created at CAsyncStreamSearch function
  napi_acquire_threadsafe_function( tsfn )
  Loop
  {
    // this will eventually invoke ThreadSafeCFunction4CallingJS()
   // we may call any number of time (in fact it can be called from any thread)
    napi_call_threadsafe_function( tsfn, WorkResult, );
  }
  napi_release_threadsafe_function( tsfn,);
}

您指出的示例是最好的信息来源之一,它直接来自 node.js 团队本身。当我学习这个概念时,我也参考了同一个例子,在我的学习过程中,通过从中提取原始想法重新创建了这个例子,希望你会发现这大大简化了。

有售

https://github.com/msatyan/MyNodeC/blob/master/src/mync1/ThreadSafeAsyncStream.cpp https://github.com/msatyan/MyNodeC/blob/master/test/ThreadSafeAsyncStream.js

这个网站的解决方案对我有用here

struct ThreadCtx {
  ThreadCtx(Napi::Env env) {};

  std::thread nativeThread;
  Napi::ThreadSafeFunction tsfn;
};


void Target::Connect(const Napi::CallbackInfo& info) {
  Napi::Env env = info.Env();


  threadCtx = new ThreadCtx(env);

  // Create a ThreadSafeFunction
  threadCtx->tsfn = Napi::ThreadSafeFunction::New(env, info[0].As<Napi::Function>(), "Resource Name", 0 /* Unlimited queue */, 1 /* Only 1 thread */, threadCtx,
        [&]( Napi::Env, void *finalizeData, ThreadCtx *context ) {
            printf("Thread cleanup\n");
            threadCtx->nativeThread.join();
        },
        (void*)nullptr
        );

  // Create a native thread
  threadCtx->nativeThread = std::thread([&] {
    auto callback = [](Napi::Env env, Napi::Function cb, char* buffer) {
        cb.Call({Napi::String::New(env, buffer)});
    };



    char reply[1024];
    memset(reply, 0, sizeof(reply));
    while(true)
    {
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, sizeof(reply)));

        if(reply_length <= 0) {
            printf("Bad read from boost asio\n");
            break;
        }


        // Callback (blocking) to JS
        napi_status status = threadCtx->tsfn.BlockingCall(reply, callback);
        if (status != napi_ok)
        {
            // Handle error
            break;
        }
    }

    // Release the thread-safe function
    threadCtx->tsfn.Release();
  });
}