超级服务器在 Future 返回 Async::NotReady 时断开连接
Hyper server drops connection on returning Async::NotReady in Future
我正在尝试 运行 一个超级服务器,它使用未来对请求进行异步响应。当 future 的 poll
方法被调用并且 returns Async::NotReady
时,连接被丢弃 ("dropping I/O source: 0")。我预计 poll
方法会被多次调用,直到 returns Async::Ready
。
example shown returns 异步 io 未来正在做(我猜)同样的事情。
为什么 future 的 poll
函数只被调用一次,为什么 hyper 在 future returns Async::NotReady
之后断开连接?
示例代码:(超级版本为:v0.12.21)
use futures::{Async, Future, Poll};
use hyper::http::{Request, Response};
use hyper::service::service_fn;
use hyper::{Body, Server};
fn main() {
let addr = ([127, 0, 0, 1], 3335).into();
println!("Start request handler. (Listening on http://{})", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(|| service_fn(|request: Request<Body>| handle_request(request.uri().path())))
.map_err(|e| println!("server error: {}", e)),
);
}
type BoxedResponseFuture = Box<Future<Item = Response<Body>, Error = tokio::io::Error> + Send>;
fn handle_request(path: &str) -> BoxedResponseFuture {
println!("Handle request {:?}", path);
Box::new(
ResponseFuture { ready: false }
.and_then(|_| {
let response = Response::new(Body::from("Success".to_string()));
Ok(response)
})
.or_else(|e| {
let response = Response::new(Body::from(format!("Error: {:?}", e)));
Ok(response)
}),
)
}
struct ResponseFuture {
ready: bool,
}
impl Future for ResponseFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
Ok(Async::NotReady)
}
}
Hyper 建立在 futures crate 之上,并使用其称为 "readiness" 或 "pull" 的未来模型,其中值是按需从 futures 中提取的,否则任务是 通知值可能已准备好取出。
当poll
returnsNotReady
时,当前任务必须注册一个就绪变化通知,否则任务可能永远不会完全的。 returns Async
必须遵守的任何功能。
换句话说,你应该等到poll
可以return Ready
或者通知当前任务表明它已经准备好进行并return NotReady
// notify about progress
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
// The executor will poll this task next iteration
futures::task::current().notify();
Ok(Async::NotReady)
}
// wait until it is Ready
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
}
}
我正在尝试 运行 一个超级服务器,它使用未来对请求进行异步响应。当 future 的 poll
方法被调用并且 returns Async::NotReady
时,连接被丢弃 ("dropping I/O source: 0")。我预计 poll
方法会被多次调用,直到 returns Async::Ready
。
example shown returns 异步 io 未来正在做(我猜)同样的事情。
为什么 future 的 poll
函数只被调用一次,为什么 hyper 在 future returns Async::NotReady
之后断开连接?
示例代码:(超级版本为:v0.12.21)
use futures::{Async, Future, Poll};
use hyper::http::{Request, Response};
use hyper::service::service_fn;
use hyper::{Body, Server};
fn main() {
let addr = ([127, 0, 0, 1], 3335).into();
println!("Start request handler. (Listening on http://{})", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(|| service_fn(|request: Request<Body>| handle_request(request.uri().path())))
.map_err(|e| println!("server error: {}", e)),
);
}
type BoxedResponseFuture = Box<Future<Item = Response<Body>, Error = tokio::io::Error> + Send>;
fn handle_request(path: &str) -> BoxedResponseFuture {
println!("Handle request {:?}", path);
Box::new(
ResponseFuture { ready: false }
.and_then(|_| {
let response = Response::new(Body::from("Success".to_string()));
Ok(response)
})
.or_else(|e| {
let response = Response::new(Body::from(format!("Error: {:?}", e)));
Ok(response)
}),
)
}
struct ResponseFuture {
ready: bool,
}
impl Future for ResponseFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
Ok(Async::NotReady)
}
}
Hyper 建立在 futures crate 之上,并使用其称为 "readiness" 或 "pull" 的未来模型,其中值是按需从 futures 中提取的,否则任务是 通知值可能已准备好取出。
当poll
returnsNotReady
时,当前任务必须注册一个就绪变化通知,否则任务可能永远不会完全的。 returns Async
必须遵守的任何功能。
换句话说,你应该等到poll
可以return Ready
或者通知当前任务表明它已经准备好进行并return NotReady
// notify about progress
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
// The executor will poll this task next iteration
futures::task::current().notify();
Ok(Async::NotReady)
}
// wait until it is Ready
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
println!("Poll future");
if self.ready {
println!("Future ready");
return Ok(Async::Ready(()));
}
println!("Future not ready");
self.ready = true;
}
}