有没有办法创建一个异步流生成器来产生重复调用函数的结果?

Is there any way to create a async stream generator that yields the result of repeatedly calling a function?

我想构建一个程序来收集天气更新并将它们表示为流。我想在无限循环中调用 get_weather(),在 finishstart.

之间有 60 秒的延迟

简化版如下所示:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

有什么方法可以轻松做到这一点?

get_weather() 超过 60 秒时,使用 tokio::timer::Interval 将不起作用:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

如果发生这种情况,下一个功能将立即启动。我想在上一个 get_weather() 开始和下一个 get_weather() 开始之间保持 60 秒。

使用 stream::unfold 从 "world of futures" 转到 "world of streams"。我们不需要任何额外的状态,所以我们使用空元组:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.085   3085495us
user    0.004   3928us
sys     0.003   3151us

另请参阅:

  • Creating a stream of values while calling async fns?