如何将 napi_threadsafe_function 用于 NodeJS Native Addon
How to use napi_threadsafe_function for NodeJS Native Addon
我一直在浏览 NAPI documentation 以尝试了解它如何处理 multithreading
。根据文档,napi_create_threadsafe_function()
和 napi_call_threadsafe_function()
用于从多个线程创建和调用 js functions
。问题是文档不是那么简单,没有示例,我在其他任何地方都找不到。
如果有人有任何使用 napi_create_threadsafe_function()
和 napi_call_threadsafe_function()
的经验或知道在哪里可以找到它们的使用示例。请提供一个基本示例,以便我了解如何正确使用它们。
我正在编写一个 C
插件而不是 C++
并且需要使用这些功能。我没有使用包装器 node-addon-api
,而是直接 napi
如果其他人遇到这个问题。我终于找到了一个例子 here.
一旦我更好地理解它并获得工作示例,我将在此处更新。希望以后有需要的人会比我更轻松。
查看 Satyan 的回答
作为一个总结性的标签,我们可以说,N-API ThreadSafeFunctions 充当在工作线程上执行的异步 C/C++ 代码和JavaScript 信息交换层.
在进入技术之前,让我们考虑一个场景,我们有一个很长的 运行 流程繁重的任务要完成。我们都知道将此任务放在 node.js 主线程上不是一个好的选择,它会阻塞事件循环并阻塞队列中的所有其他任务。所以一个好的选择可能是在一个单独的线程中考虑这个任务(让我们称这个线程为工作线程)。 JavaScript 异步回调和Promise 正是采用这种方法。
假设我们已经在工作线程 上部署了 任务,我们已经准备好一部分结果,我们希望将其发送到 JavaScript层。然后涉及的过程是,将结果转换为napi_value,然后从C/C++调用回调函数JavaScript。不幸的是,这两个操作都不能从工作线程执行;这些操作应该专门从主线程完成。 JavaScript Promise 和 Callback,等到任务完成,然后切换到主线程以及任务结果,生成正常的 C/C++ 存储设施,例如结构等。然后执行 napi_value转换并从主线程调用JavaScript回调函数。
由于我们的任务非常长 运行 可能我们不想等到任务结束才与 JavaScript 层交换结果。
让我们考虑这样一个场景,我们在一个非常大的视频中搜索对象,我们更愿意将检测到的对象发送到 JavaScript 层,就像找到它时一样。
在这种情况下我们将不得不在任务仍在进行时开始发送任务结果。 这就是异步线程安全函数调用为我们提供帮助的场景。它充当工作线程和 JavaScript 层之间用于信息交换的安全隧道。让我们考虑以下函数片段
napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
// The native addon function exposed to JavaScript
// This will be the funciton a node.js application calling.
}
void ExecuteWork(napi_env env, void* data)
{
// We will use this function to get the task done.
// This code will be executed on a worker thread.
}
void OnWorkComplete(napi_env env, napi_status status, void* data)
{
// after the `ExecuteWork` function exits, this
// callback function will be called on the main thread
}
void ThreadSafeCFunction4CallingJS(napi_env env, napi_value js_cb,
void* context, void* data)
{
// This funcion acts as a safe tunnel between the asynchronous C/C++ code
// executing the worker thread and the JavaScript layer for information exchange.
}
这前三个函数和我们熟悉的JavaScriptPromise和Callback几乎一样。第四个专门用于异步线程安全函数调用。在此,我们的长 运行 任务由工作线程 上的 ExecuteWork() 函数执行。让我们说 它指示我们不要从 ExecuteWork() 调用 JavaScript(以及任何 napi_value 结果转换)但允许从 ThreadSafeCFunction4CallingJS 这样做,只要我们使用相当于 C/C++ 函数指针的 napi 调用 ThreadSafeCFunction4CallingJS。然后我们可以将 JavaScript 调用打包到这个 ThreadSafeCFunction4CallingJS() 函数中。然后,当 ExecuteWork() 函数可以将结果传递给 ThreadSafeCFunction4CallingJS() 时,它在普通 C/C++ 存储单元(如结构等)中被调用。ThreadSafeCFunction4CallingJS() 将此结果转换为 napi_value 和调用 JavaScript 函数。
在幕后,ThreadSafeCFunction4CallingJS() 函数正在排队到事件循环,最终由主线程执行。
下面封装在 CAsyncStreamSearch() 中的代码片段负责通过使用 napi_create_threadsafe_function( ) 并且它是从本机插件的主线程本身完成的。同样,通过使用 napi_create_async_work() 函数创建工作线程的请求,然后通过 使用 napi_queue_async_work() 将工作放入事件队列 以便工作线程在将来拾取此项目。
napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
-- -- -- --
-- -- -- --
// Create a thread-safe N-API callback function correspond to the C/C++ callback function
napi_create_threadsafe_function(env,
js_cb, NULL, work_name, 0, 1, NULL, NULL, NULL,
ThreadSafeCFunction4CallingJS, // the C/C++ callback function
// out: the asynchronous thread-safe JavaScript function
&(async_stream_data_ex->tsfn_StreamSearch));
// Create an async work item, that can be deployed in the node.js event queue
napi_create_async_work( env, NULL,
work_name,
ExecuteWork,
OnWorkComplete,
async_stream_data_ex,
// OUT: THE handle to the async work item
&(async_stream_data_ex->work_StreamSearch);)
// Queue the work item for execution.
napi_queue_async_work(env, async_stream_data_ex->work_StreamSearch);
return NULL;
}
然后在异步执行任务期间(ExecuteWork() 函数)通过调用 napi_call_threadsafe_function() 函数调用 ThreadSafeCFunction4CallingJS(),如下所示。
static void ExecuteWork(napi_env env, void *data)
{
// tsfn is napi equivalent of point to ThreadSafeCFunction4CallingJS
// function that we created at CAsyncStreamSearch function
napi_acquire_threadsafe_function( tsfn )
Loop
{
// this will eventually invoke ThreadSafeCFunction4CallingJS()
// we may call any number of time (in fact it can be called from any thread)
napi_call_threadsafe_function( tsfn, WorkResult, );
}
napi_release_threadsafe_function( tsfn,);
}
您指出的示例是最好的信息来源之一,它直接来自 node.js 团队本身。当我学习这个概念时,我也参考了同一个例子,在我的学习过程中,通过从中提取原始想法重新创建了这个例子,希望你会发现这大大简化了。
有售
https://github.com/msatyan/MyNodeC/blob/master/src/mync1/ThreadSafeAsyncStream.cpp
https://github.com/msatyan/MyNodeC/blob/master/test/ThreadSafeAsyncStream.js
这个网站的解决方案对我有用here
struct ThreadCtx {
ThreadCtx(Napi::Env env) {};
std::thread nativeThread;
Napi::ThreadSafeFunction tsfn;
};
void Target::Connect(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
threadCtx = new ThreadCtx(env);
// Create a ThreadSafeFunction
threadCtx->tsfn = Napi::ThreadSafeFunction::New(env, info[0].As<Napi::Function>(), "Resource Name", 0 /* Unlimited queue */, 1 /* Only 1 thread */, threadCtx,
[&]( Napi::Env, void *finalizeData, ThreadCtx *context ) {
printf("Thread cleanup\n");
threadCtx->nativeThread.join();
},
(void*)nullptr
);
// Create a native thread
threadCtx->nativeThread = std::thread([&] {
auto callback = [](Napi::Env env, Napi::Function cb, char* buffer) {
cb.Call({Napi::String::New(env, buffer)});
};
char reply[1024];
memset(reply, 0, sizeof(reply));
while(true)
{
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, sizeof(reply)));
if(reply_length <= 0) {
printf("Bad read from boost asio\n");
break;
}
// Callback (blocking) to JS
napi_status status = threadCtx->tsfn.BlockingCall(reply, callback);
if (status != napi_ok)
{
// Handle error
break;
}
}
// Release the thread-safe function
threadCtx->tsfn.Release();
});
}
我一直在浏览 NAPI documentation 以尝试了解它如何处理 multithreading
。根据文档,napi_create_threadsafe_function()
和 napi_call_threadsafe_function()
用于从多个线程创建和调用 js functions
。问题是文档不是那么简单,没有示例,我在其他任何地方都找不到。
如果有人有任何使用 napi_create_threadsafe_function()
和 napi_call_threadsafe_function()
的经验或知道在哪里可以找到它们的使用示例。请提供一个基本示例,以便我了解如何正确使用它们。
我正在编写一个 C
插件而不是 C++
并且需要使用这些功能。我没有使用包装器 node-addon-api
,而是直接 napi
如果其他人遇到这个问题。我终于找到了一个例子 here.
一旦我更好地理解它并获得工作示例,我将在此处更新。希望以后有需要的人会比我更轻松。
查看 Satyan 的回答
作为一个总结性的标签,我们可以说,N-API ThreadSafeFunctions 充当在工作线程上执行的异步 C/C++ 代码和JavaScript 信息交换层.
在进入技术之前,让我们考虑一个场景,我们有一个很长的 运行 流程繁重的任务要完成。我们都知道将此任务放在 node.js 主线程上不是一个好的选择,它会阻塞事件循环并阻塞队列中的所有其他任务。所以一个好的选择可能是在一个单独的线程中考虑这个任务(让我们称这个线程为工作线程)。 JavaScript 异步回调和Promise 正是采用这种方法。
假设我们已经在工作线程 上部署了 任务,我们已经准备好一部分结果,我们希望将其发送到 JavaScript层。然后涉及的过程是,将结果转换为napi_value,然后从C/C++调用回调函数JavaScript。不幸的是,这两个操作都不能从工作线程执行;这些操作应该专门从主线程完成。 JavaScript Promise 和 Callback,等到任务完成,然后切换到主线程以及任务结果,生成正常的 C/C++ 存储设施,例如结构等。然后执行 napi_value转换并从主线程调用JavaScript回调函数。
由于我们的任务非常长 运行 可能我们不想等到任务结束才与 JavaScript 层交换结果。 让我们考虑这样一个场景,我们在一个非常大的视频中搜索对象,我们更愿意将检测到的对象发送到 JavaScript 层,就像找到它时一样。 在这种情况下我们将不得不在任务仍在进行时开始发送任务结果。 这就是异步线程安全函数调用为我们提供帮助的场景。它充当工作线程和 JavaScript 层之间用于信息交换的安全隧道。让我们考虑以下函数片段
napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
// The native addon function exposed to JavaScript
// This will be the funciton a node.js application calling.
}
void ExecuteWork(napi_env env, void* data)
{
// We will use this function to get the task done.
// This code will be executed on a worker thread.
}
void OnWorkComplete(napi_env env, napi_status status, void* data)
{
// after the `ExecuteWork` function exits, this
// callback function will be called on the main thread
}
void ThreadSafeCFunction4CallingJS(napi_env env, napi_value js_cb,
void* context, void* data)
{
// This funcion acts as a safe tunnel between the asynchronous C/C++ code
// executing the worker thread and the JavaScript layer for information exchange.
}
这前三个函数和我们熟悉的JavaScriptPromise和Callback几乎一样。第四个专门用于异步线程安全函数调用。在此,我们的长 运行 任务由工作线程 上的 ExecuteWork() 函数执行。让我们说 它指示我们不要从 ExecuteWork() 调用 JavaScript(以及任何 napi_value 结果转换)但允许从 ThreadSafeCFunction4CallingJS 这样做,只要我们使用相当于 C/C++ 函数指针的 napi 调用 ThreadSafeCFunction4CallingJS。然后我们可以将 JavaScript 调用打包到这个 ThreadSafeCFunction4CallingJS() 函数中。然后,当 ExecuteWork() 函数可以将结果传递给 ThreadSafeCFunction4CallingJS() 时,它在普通 C/C++ 存储单元(如结构等)中被调用。ThreadSafeCFunction4CallingJS() 将此结果转换为 napi_value 和调用 JavaScript 函数。 在幕后,ThreadSafeCFunction4CallingJS() 函数正在排队到事件循环,最终由主线程执行。
下面封装在 CAsyncStreamSearch() 中的代码片段负责通过使用 napi_create_threadsafe_function( ) 并且它是从本机插件的主线程本身完成的。同样,通过使用 napi_create_async_work() 函数创建工作线程的请求,然后通过 使用 napi_queue_async_work() 将工作放入事件队列 以便工作线程在将来拾取此项目。
napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
-- -- -- --
-- -- -- --
// Create a thread-safe N-API callback function correspond to the C/C++ callback function
napi_create_threadsafe_function(env,
js_cb, NULL, work_name, 0, 1, NULL, NULL, NULL,
ThreadSafeCFunction4CallingJS, // the C/C++ callback function
// out: the asynchronous thread-safe JavaScript function
&(async_stream_data_ex->tsfn_StreamSearch));
// Create an async work item, that can be deployed in the node.js event queue
napi_create_async_work( env, NULL,
work_name,
ExecuteWork,
OnWorkComplete,
async_stream_data_ex,
// OUT: THE handle to the async work item
&(async_stream_data_ex->work_StreamSearch);)
// Queue the work item for execution.
napi_queue_async_work(env, async_stream_data_ex->work_StreamSearch);
return NULL;
}
然后在异步执行任务期间(ExecuteWork() 函数)通过调用 napi_call_threadsafe_function() 函数调用 ThreadSafeCFunction4CallingJS(),如下所示。
static void ExecuteWork(napi_env env, void *data)
{
// tsfn is napi equivalent of point to ThreadSafeCFunction4CallingJS
// function that we created at CAsyncStreamSearch function
napi_acquire_threadsafe_function( tsfn )
Loop
{
// this will eventually invoke ThreadSafeCFunction4CallingJS()
// we may call any number of time (in fact it can be called from any thread)
napi_call_threadsafe_function( tsfn, WorkResult, );
}
napi_release_threadsafe_function( tsfn,);
}
您指出的示例是最好的信息来源之一,它直接来自 node.js 团队本身。当我学习这个概念时,我也参考了同一个例子,在我的学习过程中,通过从中提取原始想法重新创建了这个例子,希望你会发现这大大简化了。
有售https://github.com/msatyan/MyNodeC/blob/master/src/mync1/ThreadSafeAsyncStream.cpp https://github.com/msatyan/MyNodeC/blob/master/test/ThreadSafeAsyncStream.js
这个网站的解决方案对我有用here
struct ThreadCtx {
ThreadCtx(Napi::Env env) {};
std::thread nativeThread;
Napi::ThreadSafeFunction tsfn;
};
void Target::Connect(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
threadCtx = new ThreadCtx(env);
// Create a ThreadSafeFunction
threadCtx->tsfn = Napi::ThreadSafeFunction::New(env, info[0].As<Napi::Function>(), "Resource Name", 0 /* Unlimited queue */, 1 /* Only 1 thread */, threadCtx,
[&]( Napi::Env, void *finalizeData, ThreadCtx *context ) {
printf("Thread cleanup\n");
threadCtx->nativeThread.join();
},
(void*)nullptr
);
// Create a native thread
threadCtx->nativeThread = std::thread([&] {
auto callback = [](Napi::Env env, Napi::Function cb, char* buffer) {
cb.Call({Napi::String::New(env, buffer)});
};
char reply[1024];
memset(reply, 0, sizeof(reply));
while(true)
{
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, sizeof(reply)));
if(reply_length <= 0) {
printf("Bad read from boost asio\n");
break;
}
// Callback (blocking) to JS
napi_status status = threadCtx->tsfn.BlockingCall(reply, callback);
if (status != napi_ok)
{
// Handle error
break;
}
}
// Release the thread-safe function
threadCtx->tsfn.Release();
});
}