如何在不使用 tokio::spawn 的情况下 运行 循环中的多个 Tokio 异步任务?

How to run multiple Tokio async tasks in a loop without using tokio::spawn?

我制作了一个还可以显示天气的 LED 时钟。我的程序在一个循环中做了几件不同的事情,每件事都有不同的间隔:

更新 LED 是最关键的:我不希望这被延迟,例如。正在获取天气。这应该不是问题,因为获取天气主要是异步 HTTP 调用。

这是我的代码:

let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
    tokio::select! {
      _ = measure_light_stream.tick() => {
        let light = lm.get_light();
        light_smooth.sp = light;
      },
      _ = update_weather_stream.tick() => {
        let fetched_weather = weather_service.get(&config).await;
        // Store the fetched weather for later access from the displaying function.
        weather_clock.weather = fetched_weather.clone();
      },
      _ = update_leds_stream.tick() => {
        // Some code here that actually sets the LEDs.
        // This code accesses the weather_clock, the light level etc.
      },
    }
}

我意识到代码没有执行我想要它执行的操作 - 获取天气会阻止循环的执行。我明白了为什么 - tokio::select! 的文档说其他分支在 update_weather_stream.tick() 表达式完成后立即被取消。

我该如何做到这一点,以便在网络上等待获取天气时,LED 仍会更新?我发现我可以使用 tokio::spawn 来启动一个单独的非阻塞“线程”来获取天气,但后来我遇到了 weather_service 不是 Send 的问题,更不用说 weather_clock 不能在线程之间共享。我不想要这种复杂化,我可以在一个线程中处理所有 运行,就像 select! 所做的那样。

Reproducible example

use std::time::Duration;
use tokio::time::{interval, sleep};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut slow_stream = interval(Duration::from_secs(3));
    let mut fast_stream = interval(Duration::from_millis(200));
    // Note how access to this data is straightforward, I do not want
    // this to get more complicated, e.g. care about threads and Send.
    let mut val = 1;
    loop {
        tokio::select! {
          _ = fast_stream.tick() => {
            println!(".{}", val);
          },
          _ = slow_stream.tick() => {
            println!("Starting slow operation...");
            // The problem: During this await the dots are not printed.
            sleep(Duration::from_secs(1)).await;
            val += 1;
            println!("...done");
          },
        }
    }
}

您可以在同一任务中同时使用 tokio::join! 到 运行 多个异步操作。

这是一个例子:

async fn measure_light(halt: &Cell<bool>) {
    while !halt.get() {
        let light = lm.get_light();
        // ....

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn blink_led(halt: &Cell<bool>) {
    while !halt.get() {
        // LED blinking code

        tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
    }
}

async fn poll_weather(halt: &Cell<bool>) {
    while !halt.get() {
        let weather = weather_service.get(&config).await;
        // ...

        tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
    }
}

// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
    tokio::time::sleep(Duration::from_secs(10)).await;
    halt.set(true);
}

async fn main() {
    let halt = Cell::new(false);
    tokio::join!(
        measure_light(&halt),
        blink_led(&halt),
        poll_weather(&halt),
        terminate(&halt),
    );
}

如果您正在使用 tokio::TcpStream 或其他 non-blocking IO,那么它应该允许并发执行。

我添加了一个 Cell 标志来暂停执行作为示例。您可以使用相同的技术在连接分支之间共享任何可变状态。


编辑:tokio::select! 也可以做同样的事情。与您的代码的主要区别在于实际的“业务逻辑”在 select.

等待的期货中

select 允许您丢弃未完成的期货而不是等待它们自行退出(因此 halt 终止标志不是必需的)。

async fn main() {
    tokio::select! {
        _ = measure_light() => {},
        _ = blink_led() = {},
        _ = poll_weather() => {},
    }
}

这是一个具体的解决方案,基于 stepan 回答的第二部分:

use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // Cell is an acceptable complication when accessing the data.
    let val = std::cell::Cell::new(1);
    tokio::select! {
      _ = async {loop {
        println!(".{}", val.get());
        sleep(Duration::from_millis(200)).await;
      }} => {},
      _ = async {loop {
        println!("Starting slow operation...");
        // The problem: During this await the dots are not printed.
        sleep(Duration::from_secs(1)).await;
        val.set(val.get() + 1);
        println!("...done");
        sleep(Duration::from_secs(3)).await;
      }} => {},
    }
}

Playground link