tokio::select 中的 Rust lazy_static 和 tokio::sync::mpsc::channel
Rust lazy_static and tokio::sync::mpsc::channel in tokio::select
我最近开始用 Rust 编码,我很喜欢它。我正在编写一个项目,我想在其中“包装”一个 C-API。在一种情况下,我必须在 Rust 中定义回调,C 可以调用它。我让 bindgen 创建回调。由于代码需要 运行 有点异步,因此我为此使用了 tokio。
我想达到的目标
我将主函数创建为 tokio::main。在主函数中我创建了 2 个异步任务,一个监听通道,另一个触发 C-API 中的消息队列。如果消息可用,我想通过回调函数上的通道发送它们,这样我就可以在我正在侦听事件的任务上接收消息。稍后我想通过 SSE 或 GraphQL 订阅将这些消息发送给多个客户端。
我无法更改 C-Callbacks,因为它们需要可传递给 C-API,而且我必须使用回调,否则我不会收到消息。
我的最新方法看起来像这样简化:
use lazy_static::lazy_static;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
tokio::spawn(async move { // wait for a channel to have a message
loop {
tokio::select! {
// wating for a channel to receive a message
Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
habdle2.await.unwrap();
}
// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
// C-API is not async, so use synchronous lock
match BROADCAST_CONNECT.try_lock() {
Ok(value) => match value.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.try_lock() {
Ok(value) => match value.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
问题:
error[E0716]: temporary value dropped while borrowed
--> src/main.rs:37:29
|
35 | / tokio::select! {
36 | | Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | | Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | | }
| | -
| | |
| |_____________temporary value is freed at the end of this statement
| borrow later captured here by closure
|
= note: consider using a `let` binding to create a longer lived value
我理解消息以及为什么会发生这种情况,但我想不出解决方案。
我有一个使用 crossbeam 通道的工作示例,但我宁愿使用来自 tokio 的异步通道,所以我没有那么多依赖项并且一切都是异步的。
工作示例:
use lazy_static::lazy_static;
use crossbeam::{
channel::{bounded, Receiver, Sender},
select,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
let handle1 = tokio::spawn(async move {
loop {
select! {
recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match &BROADCAST_CONNECT.0.send(is_connected) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
};
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
}
}
备选
我也没有开始工作的另一种选择是使用某种本地函数或使用闭包。但我不确定这是否会,甚至是否会如何运作。也许有人有想法。如果这样的事情可行,那就太好了,所以我不必使用 lazy_static (我宁愿在我的代码中没有 global/static 变量)
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
#[tokio::main]
async fn main() {
let app = app::App::new();
let mut broadcast_connect = channel::<bool>(128);
let mut broadcast_connectionstate = channel::<bool>(128);
let notify_connect = {
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match broadcast_connect.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
let notify_connectionstate = {
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match broadcast_connectionstate.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API
let handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
这种方法的问题
can't capture dynamic environment in a fn item
--> src/main.rs:47:19
|
47 | match broadcast_connectionstate.0.blocking_send(connectionstate) {
| ^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: use the `|| { ... }` closure form instead
如果有人能解决我的任何一个问题,那就太好了。如果这是一种全新的方法,那也很好。如果 channels 或 tokio 或其他什么不是可行的方法,那也没关系。主要是我用的tokio,因为一个crate我也在用tokio,所以我不需要有更多的依赖。
非常感谢您阅读到这里。
如果您对第一个示例进行以下更改,它应该会起作用:
- 将
tokio::sync::Mutex
替换为 std::sync::Mutex
,这样您就不必在回调中使用 try_lock
。
- 不要将接收者存储在互斥体中,只存储发送者。
- 在回调中,要么使用无界通道,要么确保在发送前释放锁。
- 运行 使用
std::thread::spawn
而不是 tokio::spawn
的专用线程上的阻塞 C 代码。 (why?)
要不将接收器存储在互斥量中,您可以这样做:
static ref BROADCAST_CONNECT: Mutex<Option<Sender<bool>>> = Mutex::new(None);
// in main
let (send, recv) = channel(128);
*BROADCAST_CONNECT.lock().unwrap() = Some(send);
如果你想要一个有界频道,你可以通过先克隆频道,然后在锁上调用drop
,然后使用blocking_send
发送来释放锁。使用无限通道,这无关紧要,因为发送是即时的。
// in C callback
let lock = BROADCAST_CONNECT.lock().unwrap();
let send = lock.as_ref().clone();
drop(lock);
send.blocking_send(...);
我最近开始用 Rust 编码,我很喜欢它。我正在编写一个项目,我想在其中“包装”一个 C-API。在一种情况下,我必须在 Rust 中定义回调,C 可以调用它。我让 bindgen 创建回调。由于代码需要 运行 有点异步,因此我为此使用了 tokio。
我想达到的目标
我将主函数创建为 tokio::main。在主函数中我创建了 2 个异步任务,一个监听通道,另一个触发 C-API 中的消息队列。如果消息可用,我想通过回调函数上的通道发送它们,这样我就可以在我正在侦听事件的任务上接收消息。稍后我想通过 SSE 或 GraphQL 订阅将这些消息发送给多个客户端。
我无法更改 C-Callbacks,因为它们需要可传递给 C-API,而且我必须使用回调,否则我不会收到消息。
我的最新方法看起来像这样简化:
use lazy_static::lazy_static;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
tokio::spawn(async move { // wait for a channel to have a message
loop {
tokio::select! {
// wating for a channel to receive a message
Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
habdle2.await.unwrap();
}
// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
// C-API is not async, so use synchronous lock
match BROADCAST_CONNECT.try_lock() {
Ok(value) => match value.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.try_lock() {
Ok(value) => match value.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
问题:
error[E0716]: temporary value dropped while borrowed
--> src/main.rs:37:29
|
35 | / tokio::select! {
36 | | Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | | Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | | }
| | -
| | |
| |_____________temporary value is freed at the end of this statement
| borrow later captured here by closure
|
= note: consider using a `let` binding to create a longer lived value
我理解消息以及为什么会发生这种情况,但我想不出解决方案。
我有一个使用 crossbeam 通道的工作示例,但我宁愿使用来自 tokio 的异步通道,所以我没有那么多依赖项并且一切都是异步的。
工作示例:
use lazy_static::lazy_static;
use crossbeam::{
channel::{bounded, Receiver, Sender},
select,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
let handle1 = tokio::spawn(async move {
loop {
select! {
recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match &BROADCAST_CONNECT.0.send(is_connected) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
};
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
}
}
备选
我也没有开始工作的另一种选择是使用某种本地函数或使用闭包。但我不确定这是否会,甚至是否会如何运作。也许有人有想法。如果这样的事情可行,那就太好了,所以我不必使用 lazy_static (我宁愿在我的代码中没有 global/static 变量)
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
#[tokio::main]
async fn main() {
let app = app::App::new();
let mut broadcast_connect = channel::<bool>(128);
let mut broadcast_connectionstate = channel::<bool>(128);
let notify_connect = {
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match broadcast_connect.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
let notify_connectionstate = {
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match broadcast_connectionstate.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API
let handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
这种方法的问题
can't capture dynamic environment in a fn item
--> src/main.rs:47:19
|
47 | match broadcast_connectionstate.0.blocking_send(connectionstate) {
| ^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: use the `|| { ... }` closure form instead
如果有人能解决我的任何一个问题,那就太好了。如果这是一种全新的方法,那也很好。如果 channels 或 tokio 或其他什么不是可行的方法,那也没关系。主要是我用的tokio,因为一个crate我也在用tokio,所以我不需要有更多的依赖。
非常感谢您阅读到这里。
如果您对第一个示例进行以下更改,它应该会起作用:
- 将
tokio::sync::Mutex
替换为std::sync::Mutex
,这样您就不必在回调中使用try_lock
。 - 不要将接收者存储在互斥体中,只存储发送者。
- 在回调中,要么使用无界通道,要么确保在发送前释放锁。
- 运行 使用
std::thread::spawn
而不是tokio::spawn
的专用线程上的阻塞 C 代码。 (why?)
要不将接收器存储在互斥量中,您可以这样做:
static ref BROADCAST_CONNECT: Mutex<Option<Sender<bool>>> = Mutex::new(None);
// in main
let (send, recv) = channel(128);
*BROADCAST_CONNECT.lock().unwrap() = Some(send);
如果你想要一个有界频道,你可以通过先克隆频道,然后在锁上调用drop
,然后使用blocking_send
发送来释放锁。使用无限通道,这无关紧要,因为发送是即时的。
// in C callback
let lock = BROADCAST_CONNECT.lock().unwrap();
let send = lock.as_ref().clone();
drop(lock);
send.blocking_send(...);