超时不会在 AsyncRead 中超时
Timeout doesn't time out in AsyncRead
我正在尝试实现一个将添加读取超时功能的异步读取包装器。 objective 是 API 是普通的 AsyncRead
。换句话说,我不想在代码中到处添加 io.read(buf).timeout(t) 。相反,在给定超时到期后,读取实例本身应该 return 适当的 io::ErrorKind::TimedOut
。
不过我无法将 delay
轮询为就绪。它始终处于待定状态。我试过 async-std
、futures
、smol-timeout
- 结果相同。虽然超时确实在等待时触发,但在轮询时不会触发。我知道超时并不容易。需要一些东西来唤醒它。我究竟做错了什么?如何解决这个问题?
use async_std::{
future::Future,
io,
pin::Pin,
task::{sleep, Context, Poll},
};
use std::time::Duration;
pub struct PrudentIo<IO> {
expired: Option<Pin<Box<dyn Future<Output = ()> + Sync + Send>>>,
timeout: Duration,
io: IO,
}
impl<IO> PrudentIo<IO> {
pub fn new(timeout: Duration, io: IO) -> Self {
PrudentIo {
expired: None,
timeout,
io,
}
}
}
fn delay(t: Duration) -> Option<Pin<Box<dyn Future<Output = ()> + Sync + Send + 'static>>> {
if t.is_zero() {
return None;
}
Some(Box::pin(sleep(t)))
}
impl<IO: io::Read + Unpin> io::Read for PrudentIo<IO> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if let Some(ref mut expired) = self.expired {
match expired.as_mut().poll(cx) {
Poll::Ready(_) => {
println!("expired ready");
// too much time passed since last read/write
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
}
Poll::Pending => {
println!("expired pending");
// in good time
}
}
}
let res = Pin::new(&mut self.io).poll_read(cx, buf);
println!("read {:?}", res);
match res {
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
}
}
Poll::Ready(_) => self.expired = None,
}
res
}
}
impl<IO: io::Write + Unpin> io::Write for PrudentIo<IO> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_close(cx)
}
}
#[cfg(test)]
mod io_tests {
use super::*;
use async_std::io::ReadExt;
use async_std::prelude::FutureExt;
use async_std::{
io::{copy, Cursor},
net::TcpStream,
};
use std::time::Duration;
#[async_std::test]
async fn fail_read_after_timeout() -> io::Result<()> {
let mut output = b"______".to_vec();
let io = PendIo;
let mut io = PrudentIo::new(Duration::from_millis(5), io);
let mut io = Pin::new(&mut io);
insta::assert_debug_snapshot!(io.read(&mut output[..]).timeout(Duration::from_secs(1)).await,@"Ok(io::Err(timeou))");
Ok(())
}
#[async_std::test]
async fn timeout_expires() {
let later = delay(Duration::from_millis(1)).expect("some").await;
insta::assert_debug_snapshot!(later,@r"()");
}
/// Mock IO always pending
struct PendIo;
impl io::Read for PendIo {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut [u8],
) -> Poll<futures_io::Result<usize>> {
Poll::Pending
}
}
impl io::Write for PendIo {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<futures_io::Result<usize>> {
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Poll::Pending
}
}
}
异步超时工作如下:
- 你创造了超时的未来。
- 运行时调用
poll
进入超时,它检查超时是否已经过期。
- 如果它已过期,它 returns
Ready
并完成。
- 如果它没有过期,它会以某种方式注册回调,以便在正确的时间过去后调用
cx.waker().wake()
或类似的方法。
- 时间过去后,调用来自#4 的回调,在适当的唤醒器中调用
wake()
,指示运行时再次调用 poll
。
- 这次
poll
会returnReady
。完成!
您的代码存在的问题是您从 poll()
实现内部创建了延迟:self.expired = delay(self.timeout);
。但是你 return Pending
甚至一次都没有轮询超时。这样,就不会在任何地方注册调用 Waker
的回调。没有唤醒,没有超时。
我看到了几种解决方案:
A。不要将 PrudentIo::expired
初始化为 None
,而是直接在构造函数中创建 timeout
。这样,超时将始终在 io
之前至少被轮询一次,并将被唤醒。但是你总是会创建一个超时,即使它实际上并不需要。
B。创建 timeout
时进行递归轮询:
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
return self.poll_read(cx, buf);
}
这将不必要地调用 io
两次,因此它可能不是最优的。
C。创建超时后添加对轮询的调用:
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
self.expired.as_mut().unwrap().as_mut().poll(cx);
}
也许你应该匹配 poll 的输出,以防它 returns Ready
,但是嘿,这是一个新的超时,它可能尚未完成,而且它似乎工作得很好。
// 1. smol used, not async_std.
// 2. IO should be 'static.
// 3. when timeout, read_poll return Poll::Ready::Err(io::ErrorKind::Timeout)
use {
smol::{future::FutureExt, io, ready, Timer},
std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
},
};
// --
pub struct PrudentIo<IO> {
expired: Option<Pin<Box<dyn Future<Output = io::Result<usize>>>>>,
timeout: Duration,
io: IO,
}
impl<IO> PrudentIo<IO> {
pub fn new(timeout: Duration, io: IO) -> Self {
PrudentIo {
expired: None,
timeout,
io,
}
}
}
impl<IO: io::AsyncRead + Unpin + 'static> io::AsyncRead for PrudentIo<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
loop {
if let Some(expired) = this.expired.as_mut() {
let res = ready!(expired.poll(cx))?;
this.expired.take();
return Ok(res).into();
}
let timeout = this.timeout.clone();
let (io, read_buf) = unsafe {
// Safety: ONLY used in poll_read method.
(&mut *(&mut this.io as *mut IO), &mut *(buf as *mut [u8]))
};
let fut = async move {
let timeout_fut = async {
Timer::after(timeout).await;
io::Result::<usize>::Err(io::ErrorKind::TimedOut.into())
};
let read_fut = io::AsyncReadExt::read(io, read_buf);
let res = read_fut.or(timeout_fut).await;
res
}
.boxed_local();
this.expired = Some(fut);
}
}
}
impl<IO: io::AsyncWrite + Unpin> io::AsyncWrite for PrudentIo<IO> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_close(cx)
}
}
// This is another solution. I think it is better.
impl<IO: io::AsyncRead + Unpin> io::AsyncRead for PrudentIo<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
let io = Pin::new(&mut this.io);
if let Poll::Ready(res) = io.poll_read(cx, buf) {
return Poll::Ready(res);
}
loop {
if let Some(expired) = this.expired.as_mut() {
ready!(expired.poll(cx));
this.expired.take();
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
}
let timeout = Timer::after(this.timeout);
this.expired = Some(timeout);
}
}
}
我正在尝试实现一个将添加读取超时功能的异步读取包装器。 objective 是 API 是普通的 AsyncRead
。换句话说,我不想在代码中到处添加 io.read(buf).timeout(t) 。相反,在给定超时到期后,读取实例本身应该 return 适当的 io::ErrorKind::TimedOut
。
不过我无法将 delay
轮询为就绪。它始终处于待定状态。我试过 async-std
、futures
、smol-timeout
- 结果相同。虽然超时确实在等待时触发,但在轮询时不会触发。我知道超时并不容易。需要一些东西来唤醒它。我究竟做错了什么?如何解决这个问题?
use async_std::{
future::Future,
io,
pin::Pin,
task::{sleep, Context, Poll},
};
use std::time::Duration;
pub struct PrudentIo<IO> {
expired: Option<Pin<Box<dyn Future<Output = ()> + Sync + Send>>>,
timeout: Duration,
io: IO,
}
impl<IO> PrudentIo<IO> {
pub fn new(timeout: Duration, io: IO) -> Self {
PrudentIo {
expired: None,
timeout,
io,
}
}
}
fn delay(t: Duration) -> Option<Pin<Box<dyn Future<Output = ()> + Sync + Send + 'static>>> {
if t.is_zero() {
return None;
}
Some(Box::pin(sleep(t)))
}
impl<IO: io::Read + Unpin> io::Read for PrudentIo<IO> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if let Some(ref mut expired) = self.expired {
match expired.as_mut().poll(cx) {
Poll::Ready(_) => {
println!("expired ready");
// too much time passed since last read/write
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
}
Poll::Pending => {
println!("expired pending");
// in good time
}
}
}
let res = Pin::new(&mut self.io).poll_read(cx, buf);
println!("read {:?}", res);
match res {
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
}
}
Poll::Ready(_) => self.expired = None,
}
res
}
}
impl<IO: io::Write + Unpin> io::Write for PrudentIo<IO> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_close(cx)
}
}
#[cfg(test)]
mod io_tests {
use super::*;
use async_std::io::ReadExt;
use async_std::prelude::FutureExt;
use async_std::{
io::{copy, Cursor},
net::TcpStream,
};
use std::time::Duration;
#[async_std::test]
async fn fail_read_after_timeout() -> io::Result<()> {
let mut output = b"______".to_vec();
let io = PendIo;
let mut io = PrudentIo::new(Duration::from_millis(5), io);
let mut io = Pin::new(&mut io);
insta::assert_debug_snapshot!(io.read(&mut output[..]).timeout(Duration::from_secs(1)).await,@"Ok(io::Err(timeou))");
Ok(())
}
#[async_std::test]
async fn timeout_expires() {
let later = delay(Duration::from_millis(1)).expect("some").await;
insta::assert_debug_snapshot!(later,@r"()");
}
/// Mock IO always pending
struct PendIo;
impl io::Read for PendIo {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut [u8],
) -> Poll<futures_io::Result<usize>> {
Poll::Pending
}
}
impl io::Write for PendIo {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<futures_io::Result<usize>> {
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Poll::Pending
}
}
}
异步超时工作如下:
- 你创造了超时的未来。
- 运行时调用
poll
进入超时,它检查超时是否已经过期。 - 如果它已过期,它 returns
Ready
并完成。 - 如果它没有过期,它会以某种方式注册回调,以便在正确的时间过去后调用
cx.waker().wake()
或类似的方法。 - 时间过去后,调用来自#4 的回调,在适当的唤醒器中调用
wake()
,指示运行时再次调用poll
。 - 这次
poll
会returnReady
。完成!
您的代码存在的问题是您从 poll()
实现内部创建了延迟:self.expired = delay(self.timeout);
。但是你 return Pending
甚至一次都没有轮询超时。这样,就不会在任何地方注册调用 Waker
的回调。没有唤醒,没有超时。
我看到了几种解决方案:
A。不要将 PrudentIo::expired
初始化为 None
,而是直接在构造函数中创建 timeout
。这样,超时将始终在 io
之前至少被轮询一次,并将被唤醒。但是你总是会创建一个超时,即使它实际上并不需要。
B。创建 timeout
时进行递归轮询:
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
return self.poll_read(cx, buf);
}
这将不必要地调用 io
两次,因此它可能不是最优的。
C。创建超时后添加对轮询的调用:
Poll::Pending => {
if self.expired.is_none() {
// No data, start checking for a timeout
self.expired = delay(self.timeout);
self.expired.as_mut().unwrap().as_mut().poll(cx);
}
也许你应该匹配 poll 的输出,以防它 returns Ready
,但是嘿,这是一个新的超时,它可能尚未完成,而且它似乎工作得很好。
// 1. smol used, not async_std.
// 2. IO should be 'static.
// 3. when timeout, read_poll return Poll::Ready::Err(io::ErrorKind::Timeout)
use {
smol::{future::FutureExt, io, ready, Timer},
std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
},
};
// --
pub struct PrudentIo<IO> {
expired: Option<Pin<Box<dyn Future<Output = io::Result<usize>>>>>,
timeout: Duration,
io: IO,
}
impl<IO> PrudentIo<IO> {
pub fn new(timeout: Duration, io: IO) -> Self {
PrudentIo {
expired: None,
timeout,
io,
}
}
}
impl<IO: io::AsyncRead + Unpin + 'static> io::AsyncRead for PrudentIo<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
loop {
if let Some(expired) = this.expired.as_mut() {
let res = ready!(expired.poll(cx))?;
this.expired.take();
return Ok(res).into();
}
let timeout = this.timeout.clone();
let (io, read_buf) = unsafe {
// Safety: ONLY used in poll_read method.
(&mut *(&mut this.io as *mut IO), &mut *(buf as *mut [u8]))
};
let fut = async move {
let timeout_fut = async {
Timer::after(timeout).await;
io::Result::<usize>::Err(io::ErrorKind::TimedOut.into())
};
let read_fut = io::AsyncReadExt::read(io, read_buf);
let res = read_fut.or(timeout_fut).await;
res
}
.boxed_local();
this.expired = Some(fut);
}
}
}
impl<IO: io::AsyncWrite + Unpin> io::AsyncWrite for PrudentIo<IO> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_close(cx)
}
}
// This is another solution. I think it is better.
impl<IO: io::AsyncRead + Unpin> io::AsyncRead for PrudentIo<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
let io = Pin::new(&mut this.io);
if let Poll::Ready(res) = io.poll_read(cx, buf) {
return Poll::Ready(res);
}
loop {
if let Some(expired) = this.expired.as_mut() {
ready!(expired.poll(cx));
this.expired.take();
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
}
let timeout = Timer::after(this.timeout);
this.expired = Some(timeout);
}
}
}