如何在不使用 tokio::spawn 的情况下 运行 循环中的多个 Tokio 异步任务?
How to run multiple Tokio async tasks in a loop without using tokio::spawn?
我制作了一个还可以显示天气的 LED 时钟。我的程序在一个循环中做了几件不同的事情,每件事都有不同的间隔:
- 每 50 毫秒更新一次 LED,
- 每 1 秒检查一次亮度级别(以调整亮度),
- 每 10 分钟获取一次天气,
- 实际上还有一些,但这无关紧要。
更新 LED 是最关键的:我不希望这被延迟,例如。正在获取天气。这应该不是问题,因为获取天气主要是异步 HTTP 调用。
这是我的代码:
let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
tokio::select! {
_ = measure_light_stream.tick() => {
let light = lm.get_light();
light_smooth.sp = light;
},
_ = update_weather_stream.tick() => {
let fetched_weather = weather_service.get(&config).await;
// Store the fetched weather for later access from the displaying function.
weather_clock.weather = fetched_weather.clone();
},
_ = update_leds_stream.tick() => {
// Some code here that actually sets the LEDs.
// This code accesses the weather_clock, the light level etc.
},
}
}
我意识到代码没有执行我想要它执行的操作 - 获取天气会阻止循环的执行。我明白了为什么 - tokio::select!
的文档说其他分支在 update_weather_stream.tick()
表达式完成后立即被取消。
我该如何做到这一点,以便在网络上等待获取天气时,LED 仍会更新?我发现我可以使用 tokio::spawn
来启动一个单独的非阻塞“线程”来获取天气,但后来我遇到了 weather_service
不是 Send
的问题,更不用说 weather_clock
不能在线程之间共享。我不想要这种复杂化,我可以在一个线程中处理所有 运行,就像 select!
所做的那样。
use std::time::Duration;
use tokio::time::{interval, sleep};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut slow_stream = interval(Duration::from_secs(3));
let mut fast_stream = interval(Duration::from_millis(200));
// Note how access to this data is straightforward, I do not want
// this to get more complicated, e.g. care about threads and Send.
let mut val = 1;
loop {
tokio::select! {
_ = fast_stream.tick() => {
println!(".{}", val);
},
_ = slow_stream.tick() => {
println!("Starting slow operation...");
// The problem: During this await the dots are not printed.
sleep(Duration::from_secs(1)).await;
val += 1;
println!("...done");
},
}
}
}
您可以在同一任务中同时使用 tokio::join!
到 运行 多个异步操作。
这是一个例子:
async fn measure_light(halt: &Cell<bool>) {
while !halt.get() {
let light = lm.get_light();
// ....
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn blink_led(halt: &Cell<bool>) {
while !halt.get() {
// LED blinking code
tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
}
}
async fn poll_weather(halt: &Cell<bool>) {
while !halt.get() {
let weather = weather_service.get(&config).await;
// ...
tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
}
}
// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
tokio::time::sleep(Duration::from_secs(10)).await;
halt.set(true);
}
async fn main() {
let halt = Cell::new(false);
tokio::join!(
measure_light(&halt),
blink_led(&halt),
poll_weather(&halt),
terminate(&halt),
);
}
如果您正在使用 tokio::TcpStream
或其他 non-blocking IO,那么它应该允许并发执行。
我添加了一个 Cell
标志来暂停执行作为示例。您可以使用相同的技术在连接分支之间共享任何可变状态。
编辑:tokio::select!
也可以做同样的事情。与您的代码的主要区别在于实际的“业务逻辑”在 select
.
等待的期货中
select
允许您丢弃未完成的期货而不是等待它们自行退出(因此 halt
终止标志不是必需的)。
async fn main() {
tokio::select! {
_ = measure_light() => {},
_ = blink_led() = {},
_ = poll_weather() => {},
}
}
这是一个具体的解决方案,基于 stepan 回答的第二部分:
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
// Cell is an acceptable complication when accessing the data.
let val = std::cell::Cell::new(1);
tokio::select! {
_ = async {loop {
println!(".{}", val.get());
sleep(Duration::from_millis(200)).await;
}} => {},
_ = async {loop {
println!("Starting slow operation...");
// The problem: During this await the dots are not printed.
sleep(Duration::from_secs(1)).await;
val.set(val.get() + 1);
println!("...done");
sleep(Duration::from_secs(3)).await;
}} => {},
}
}
我制作了一个还可以显示天气的 LED 时钟。我的程序在一个循环中做了几件不同的事情,每件事都有不同的间隔:
- 每 50 毫秒更新一次 LED,
- 每 1 秒检查一次亮度级别(以调整亮度),
- 每 10 分钟获取一次天气,
- 实际上还有一些,但这无关紧要。
更新 LED 是最关键的:我不希望这被延迟,例如。正在获取天气。这应该不是问题,因为获取天气主要是异步 HTTP 调用。
这是我的代码:
let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
tokio::select! {
_ = measure_light_stream.tick() => {
let light = lm.get_light();
light_smooth.sp = light;
},
_ = update_weather_stream.tick() => {
let fetched_weather = weather_service.get(&config).await;
// Store the fetched weather for later access from the displaying function.
weather_clock.weather = fetched_weather.clone();
},
_ = update_leds_stream.tick() => {
// Some code here that actually sets the LEDs.
// This code accesses the weather_clock, the light level etc.
},
}
}
我意识到代码没有执行我想要它执行的操作 - 获取天气会阻止循环的执行。我明白了为什么 - tokio::select!
的文档说其他分支在 update_weather_stream.tick()
表达式完成后立即被取消。
我该如何做到这一点,以便在网络上等待获取天气时,LED 仍会更新?我发现我可以使用 tokio::spawn
来启动一个单独的非阻塞“线程”来获取天气,但后来我遇到了 weather_service
不是 Send
的问题,更不用说 weather_clock
不能在线程之间共享。我不想要这种复杂化,我可以在一个线程中处理所有 运行,就像 select!
所做的那样。
use std::time::Duration;
use tokio::time::{interval, sleep};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut slow_stream = interval(Duration::from_secs(3));
let mut fast_stream = interval(Duration::from_millis(200));
// Note how access to this data is straightforward, I do not want
// this to get more complicated, e.g. care about threads and Send.
let mut val = 1;
loop {
tokio::select! {
_ = fast_stream.tick() => {
println!(".{}", val);
},
_ = slow_stream.tick() => {
println!("Starting slow operation...");
// The problem: During this await the dots are not printed.
sleep(Duration::from_secs(1)).await;
val += 1;
println!("...done");
},
}
}
}
您可以在同一任务中同时使用 tokio::join!
到 运行 多个异步操作。
这是一个例子:
async fn measure_light(halt: &Cell<bool>) {
while !halt.get() {
let light = lm.get_light();
// ....
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn blink_led(halt: &Cell<bool>) {
while !halt.get() {
// LED blinking code
tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
}
}
async fn poll_weather(halt: &Cell<bool>) {
while !halt.get() {
let weather = weather_service.get(&config).await;
// ...
tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
}
}
// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
tokio::time::sleep(Duration::from_secs(10)).await;
halt.set(true);
}
async fn main() {
let halt = Cell::new(false);
tokio::join!(
measure_light(&halt),
blink_led(&halt),
poll_weather(&halt),
terminate(&halt),
);
}
如果您正在使用 tokio::TcpStream
或其他 non-blocking IO,那么它应该允许并发执行。
我添加了一个 Cell
标志来暂停执行作为示例。您可以使用相同的技术在连接分支之间共享任何可变状态。
编辑:tokio::select!
也可以做同样的事情。与您的代码的主要区别在于实际的“业务逻辑”在 select
.
select
允许您丢弃未完成的期货而不是等待它们自行退出(因此 halt
终止标志不是必需的)。
async fn main() {
tokio::select! {
_ = measure_light() => {},
_ = blink_led() = {},
_ = poll_weather() => {},
}
}
这是一个具体的解决方案,基于 stepan 回答的第二部分:
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
// Cell is an acceptable complication when accessing the data.
let val = std::cell::Cell::new(1);
tokio::select! {
_ = async {loop {
println!(".{}", val.get());
sleep(Duration::from_millis(200)).await;
}} => {},
_ = async {loop {
println!("Starting slow operation...");
// The problem: During this await the dots are not printed.
sleep(Duration::from_secs(1)).await;
val.set(val.get() + 1);
println!("...done");
sleep(Duration::from_secs(3)).await;
}} => {},
}
}