有没有办法创建一个异步流生成器来产生重复调用函数的结果?
Is there any way to create a async stream generator that yields the result of repeatedly calling a function?
我想构建一个程序来收集天气更新并将它们表示为流。我想在无限循环中调用 get_weather()
,在 finish 和 start.
之间有 60 秒的延迟
简化版如下所示:
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
有什么方法可以轻松做到这一点?
当 get_weather()
超过 60 秒时,使用 tokio::timer::Interval
将不起作用:
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
.then(|| get_weather())
}
如果发生这种情况,下一个功能将立即启动。我想在上一个 get_weather()
开始和下一个 get_weather()
开始之间保持 60 秒。
使用 stream::unfold
从 "world of futures" 转到 "world of streams"。我们不需要任何额外的状态,所以我们使用空元组:
use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11
struct Weather;
async fn get_weather() -> Weather {
Weather
}
const BETWEEN: Duration = Duration::from_secs(1);
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
futures::stream::unfold((), |_| async {
time::delay_for(BETWEEN).await;
let weather = get_weather().await;
Some((weather, ()))
})
}
#[tokio::main]
async fn main() {
get_weather_stream()
.take(3)
.for_each(|_v| async {
println!("Got the weather");
})
.await;
}
% time ./target/debug/example
Got the weather
Got the weather
Got the weather
real 3.085 3085495us
user 0.004 3928us
sys 0.003 3151us
另请参阅:
- Creating a stream of values while calling async fns?
我想构建一个程序来收集天气更新并将它们表示为流。我想在无限循环中调用 get_weather()
,在 finish 和 start.
简化版如下所示:
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
有什么方法可以轻松做到这一点?
当 get_weather()
超过 60 秒时,使用 tokio::timer::Interval
将不起作用:
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
.then(|| get_weather())
}
如果发生这种情况,下一个功能将立即启动。我想在上一个 get_weather()
开始和下一个 get_weather()
开始之间保持 60 秒。
使用 stream::unfold
从 "world of futures" 转到 "world of streams"。我们不需要任何额外的状态,所以我们使用空元组:
use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11
struct Weather;
async fn get_weather() -> Weather {
Weather
}
const BETWEEN: Duration = Duration::from_secs(1);
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
futures::stream::unfold((), |_| async {
time::delay_for(BETWEEN).await;
let weather = get_weather().await;
Some((weather, ()))
})
}
#[tokio::main]
async fn main() {
get_weather_stream()
.take(3)
.for_each(|_v| async {
println!("Got the weather");
})
.await;
}
% time ./target/debug/example
Got the weather
Got the weather
Got the weather
real 3.085 3085495us
user 0.004 3928us
sys 0.003 3151us
另请参阅:
- Creating a stream of values while calling async fns?