为什么 poll() 中的这个 Delay future 在我的自定义 Stream 类型中不起作用?
Why does this Delay future inside poll() not work in my custom Stream type?
我想每秒打印一次"Hello"。
引用文档:
Futures use a poll based model. The consumer of a future repeatedly calls the poll function. The future then attempts to complete. If the future is able to complete, it returns Async::Ready(value). If the future is unable to complete due to being blocked on an internal resource (such as a TCP socket), it returns Async::NotReady.
我的 poll
函数 returns NotReady
如果 Delay
s return 是 NotReady
,但没有任何内容打印到标准输出。
use futures::{Async, Future, Stream}; // 0.1.25
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.15
struct SomeStream;
impl Stream for SomeStream {
type Item = String;
type Error = ();
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let when = Instant::now() + Duration::from_millis(1000);
let mut task = Delay::new(when).map_err(|e| eprintln!("{:?}", e));
match task.poll() {
Ok(Async::Ready(value)) => {}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(()),
}
Ok(Async::Ready(Some("Hello".to_string())))
}
}
fn main() {
let s = SomeStream;
let future = s
.for_each(|item| {
println!("{:?}", item);
Ok(())
})
.map_err(|e| {});
tokio::run(future);
}
这里的主要问题是缺少状态管理。每次轮询流时,您都在创建一个新的 Delay
未来,而不是一直保留到它被解决。
这将导致永远看不到任何项目从流中出来,因为这些期货只被轮询一次,每次都可能产生 NotReady
。
您需要在您的类型中跟踪延迟未来 SomeStream
。在这种情况下,可以使用一个选项,以便也确定我们是否需要创建一个新的延迟。
#[derive(Debug, Default)]
struct SomeStream {
delay: Option<Delay>,
}
SomeStream::poll
的后续代码,具有更好的错误处理和更惯用的构造,将变成如下所示:
impl Stream for SomeStream {
type Item = String;
type Error = Box<dyn std::error::Error + Send + Sync>; // generic error
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
match delay.poll() {
Ok(Async::Ready(value)) => {
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
}
或者,更好的是,使用 try_ready!
宏,这使得 return 错误和 NotReady
信号具有更少的样板文件。
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
try_ready!(delay.poll());
// tick!
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
}
我想每秒打印一次"Hello"。
引用文档:
Futures use a poll based model. The consumer of a future repeatedly calls the poll function. The future then attempts to complete. If the future is able to complete, it returns Async::Ready(value). If the future is unable to complete due to being blocked on an internal resource (such as a TCP socket), it returns Async::NotReady.
我的 poll
函数 returns NotReady
如果 Delay
s return 是 NotReady
,但没有任何内容打印到标准输出。
use futures::{Async, Future, Stream}; // 0.1.25
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.15
struct SomeStream;
impl Stream for SomeStream {
type Item = String;
type Error = ();
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let when = Instant::now() + Duration::from_millis(1000);
let mut task = Delay::new(when).map_err(|e| eprintln!("{:?}", e));
match task.poll() {
Ok(Async::Ready(value)) => {}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(()),
}
Ok(Async::Ready(Some("Hello".to_string())))
}
}
fn main() {
let s = SomeStream;
let future = s
.for_each(|item| {
println!("{:?}", item);
Ok(())
})
.map_err(|e| {});
tokio::run(future);
}
这里的主要问题是缺少状态管理。每次轮询流时,您都在创建一个新的 Delay
未来,而不是一直保留到它被解决。
这将导致永远看不到任何项目从流中出来,因为这些期货只被轮询一次,每次都可能产生 NotReady
。
您需要在您的类型中跟踪延迟未来 SomeStream
。在这种情况下,可以使用一个选项,以便也确定我们是否需要创建一个新的延迟。
#[derive(Debug, Default)]
struct SomeStream {
delay: Option<Delay>,
}
SomeStream::poll
的后续代码,具有更好的错误处理和更惯用的构造,将变成如下所示:
impl Stream for SomeStream {
type Item = String;
type Error = Box<dyn std::error::Error + Send + Sync>; // generic error
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
match delay.poll() {
Ok(Async::Ready(value)) => {
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
}
或者,更好的是,使用 try_ready!
宏,这使得 return 错误和 NotReady
信号具有更少的样板文件。
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
try_ready!(delay.poll());
// tick!
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
}