为什么在 crossbeam_channel::select 旁边调用 tokio::spawn 会有延迟?
Why does tokio::spawn have a delay when called next to crossbeam_channel::select?
我正在创建一个会产生其他任务的任务。其中一些需要一些时间,因此无法等待,但可以 运行 并行:
src/main.rs
use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::spawn(async move {
let mut counter = 0;
loop {
let loop_id = counter.clone();
tokio::spawn(async move { // why this one was not fired?
println!("inner task {}", loop_id);
}); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
println!("loop {}", loop_id);
select! {
recv(r) -> rr => {
// match rr {
// Ok(ee) => {
// println!("received from channel {}", loop_id);
// tokio::spawn(async move {
// println!("received from channel task {}", loop_id);
// });
// },
// Err(e) => println!("{}", e),
// };
},
// more recv(some_channel) ->
}
counter = counter + 1;
}
});
// let s_clone = s.clone();
// tokio::spawn(async move {
// s_clone.send(2).unwrap();
// });
loop {
// rest of the program
}
}
我注意到了奇怪的行为。这输出:
loop 0
我期待它也输出 inner task 0
。
如果我向通道发送一个值,输出将是:
loop 0
inner task 0
loop 1
缺少这个 inner task 1
。
为什么 inner task
产生一圈延迟?
我第一次注意到这种行为 'received from channel task' 延迟了一个循环,但是当我减少代码以准备示例时,这种情况开始发生 'inner task'。可能值得一提的是,如果我将 second tokio::spawn
写到另一个,则只有最后一个会出现此问题。调用 tokio::spawn
和 select!
时有什么我应该注意的吗?是什么导致了这一循环延迟?
Cargo.toml 依赖关系
[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"
生锈 1.46,Windows10
select!
正在阻塞,tokio::spawn
say:
的文档
The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.
在这种情况下,select!
“future”实际上是一个阻塞函数,spawn
不使用新线程(无论是在第一次调用还是循环内) .
因为你没有告诉 tokio 你要阻塞,tokio 认为不需要另一个线程(从 tokio 的角度来看,你只有 3 个永远不应该阻塞的 futures,所以你为什么还需要另一个线程?)。
解决方案是对 select!
-ing 闭包使用 tokio::task::spawn_blocking
(这将不再是未来,所以 async move {}
现在是 move || {}
)。
现在 tokio 会知道这个函数实际上是阻塞的,并将它移动到另一个线程(同时将所有实际的未来保留在其他执行线程中)。
use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::task::spawn_blocking(move || {
// ...
});
loop {
// rest of the program
}
}
另一种可能的解决方案是使用像 tokio::sync::mpsc
这样的 non-blocking 通道,您可以在其上使用 await
并获得预期的行为,例如 playground example 直接recv().await
或 tokio::select!
,像这样:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut s, mut r) = mpsc::channel::<usize>(1);
tokio::spawn(async move {
loop {
// ...
tokio::select! {
Some(i) = r.recv() => {
println!("got = {}", i);
}
}
}
});
loop {
// rest of the program
}
}
我正在创建一个会产生其他任务的任务。其中一些需要一些时间,因此无法等待,但可以 运行 并行:
src/main.rs
use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::spawn(async move {
let mut counter = 0;
loop {
let loop_id = counter.clone();
tokio::spawn(async move { // why this one was not fired?
println!("inner task {}", loop_id);
}); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
println!("loop {}", loop_id);
select! {
recv(r) -> rr => {
// match rr {
// Ok(ee) => {
// println!("received from channel {}", loop_id);
// tokio::spawn(async move {
// println!("received from channel task {}", loop_id);
// });
// },
// Err(e) => println!("{}", e),
// };
},
// more recv(some_channel) ->
}
counter = counter + 1;
}
});
// let s_clone = s.clone();
// tokio::spawn(async move {
// s_clone.send(2).unwrap();
// });
loop {
// rest of the program
}
}
我注意到了奇怪的行为。这输出:
loop 0
我期待它也输出 inner task 0
。
如果我向通道发送一个值,输出将是:
loop 0
inner task 0
loop 1
缺少这个 inner task 1
。
为什么 inner task
产生一圈延迟?
我第一次注意到这种行为 'received from channel task' 延迟了一个循环,但是当我减少代码以准备示例时,这种情况开始发生 'inner task'。可能值得一提的是,如果我将 second tokio::spawn
写到另一个,则只有最后一个会出现此问题。调用 tokio::spawn
和 select!
时有什么我应该注意的吗?是什么导致了这一循环延迟?
Cargo.toml 依赖关系
[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"
生锈 1.46,Windows10
select!
正在阻塞,tokio::spawn
say:
The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.
在这种情况下,select!
“future”实际上是一个阻塞函数,spawn
不使用新线程(无论是在第一次调用还是循环内) .
因为你没有告诉 tokio 你要阻塞,tokio 认为不需要另一个线程(从 tokio 的角度来看,你只有 3 个永远不应该阻塞的 futures,所以你为什么还需要另一个线程?)。
解决方案是对 select!
-ing 闭包使用 tokio::task::spawn_blocking
(这将不再是未来,所以 async move {}
现在是 move || {}
)。
现在 tokio 会知道这个函数实际上是阻塞的,并将它移动到另一个线程(同时将所有实际的未来保留在其他执行线程中)。
use crossbeam::crossbeam_channel::{bounded, select};
#[tokio::main]
async fn main() {
let (s, r) = bounded::<usize>(1);
tokio::task::spawn_blocking(move || {
// ...
});
loop {
// rest of the program
}
}
另一种可能的解决方案是使用像 tokio::sync::mpsc
这样的 non-blocking 通道,您可以在其上使用 await
并获得预期的行为,例如 playground example 直接recv().await
或 tokio::select!
,像这样:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut s, mut r) = mpsc::channel::<usize>(1);
tokio::spawn(async move {
loop {
// ...
tokio::select! {
Some(i) = r.recv() => {
println!("got = {}", i);
}
}
}
});
loop {
// rest of the program
}
}