实践中实现Future时如何使用Context和Wakers
How to use Context and Wakers when implementing Future in practice
我发现很难理解为什么以及何时我需要明确地对 Context
and/or 做一些事情,它的 Waker
传递给 poll
方法我正在为其实施 Future
的对象。我一直在阅读 Tokio and the Async Book 的文档,但我觉得 examples/methods 过于抽象,无法应用于实际问题。
例如,我原以为下面的 MRE 会死锁,因为 new_inner_task
生成的 future 不知道何时在 MPSC 通道上传递了消息,但是,这个例子似乎工作正常。为什么会这样?
use std::{future::Future, pin::Pin, task::{Context, Poll}, time::Duration};
use futures::{FutureExt, StreamExt}; // 0.3
use tokio::sync::mpsc; // 1.2
use tokio_stream::wrappers::UnboundedReceiverStream; // 0.1
async fn new_inner_task(rx: mpsc::UnboundedReceiver<()>) {
let mut requests = UnboundedReceiverStream::new(rx);
while let Some(_) = requests.next().await {
eprintln!("received request");
}
}
pub struct ActiveObject(Pin<Box<dyn Future<Output = ()> + Send>>);
impl ActiveObject {
pub fn new() -> (Self, mpsc::UnboundedSender<()>) {
let (tx, rx) = mpsc::unbounded_channel();
(Self(new_inner_task(rx).boxed()), tx)
}
}
impl Future for ActiveObject {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
eprintln!("[polled]");
self.get_mut().0.as_mut().poll(cx)
}
}
async fn delayed_send(delay: u64, sender: mpsc::UnboundedSender<()>) {
tokio::time::sleep(Duration::from_millis(delay)).await;
sender.send(()).unwrap();
eprintln!("sent request");
}
#[tokio::main]
async fn main() {
let (obj, tx) = ActiveObject::new();
let ds = delayed_send(500, tx.clone());
let ds2 = delayed_send(1000, tx);
tokio::join!(obj, ds, ds2);
}
我从 运行 这个例子本地得到的输出是:
[polled]
[polled]
sent request
[polled]
received request
[polled]
sent request
[polled]
received request
所以,虽然我没有对 Context
或 Waker
做任何事情,ActiveObject
似乎以合理的速度接受轮询,也就是说,比要求的更频繁,但是不忙等待。是什么导致 ActiveObject
以这种速度被唤醒 up/polled?
您正在将相同的 Context
(因此 Waker
)传递给 new_inner_task
返回的 Future 的 poll()
方法,该方法将其向下传递到链中UnboundedReceiverStream::next()
返回的 Future
的 poll()
。该实现安排在适当的时间(当新元素出现在频道中时)在此 Waker
上调用 wake()
。完成后,Tokio 将轮询与此相关的顶级未来 Waker
- 三个未来的 join!()
。
如果您省略了轮询内部任务的行而只是返回 Poll::Pending
,您将得到预期的情况,您的 Future
将被轮询一次,然后永远“挂起”,因为没有什么会再次唤醒它。
我发现很难理解为什么以及何时我需要明确地对 Context
and/or 做一些事情,它的 Waker
传递给 poll
方法我正在为其实施 Future
的对象。我一直在阅读 Tokio and the Async Book 的文档,但我觉得 examples/methods 过于抽象,无法应用于实际问题。
例如,我原以为下面的 MRE 会死锁,因为 new_inner_task
生成的 future 不知道何时在 MPSC 通道上传递了消息,但是,这个例子似乎工作正常。为什么会这样?
use std::{future::Future, pin::Pin, task::{Context, Poll}, time::Duration};
use futures::{FutureExt, StreamExt}; // 0.3
use tokio::sync::mpsc; // 1.2
use tokio_stream::wrappers::UnboundedReceiverStream; // 0.1
async fn new_inner_task(rx: mpsc::UnboundedReceiver<()>) {
let mut requests = UnboundedReceiverStream::new(rx);
while let Some(_) = requests.next().await {
eprintln!("received request");
}
}
pub struct ActiveObject(Pin<Box<dyn Future<Output = ()> + Send>>);
impl ActiveObject {
pub fn new() -> (Self, mpsc::UnboundedSender<()>) {
let (tx, rx) = mpsc::unbounded_channel();
(Self(new_inner_task(rx).boxed()), tx)
}
}
impl Future for ActiveObject {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
eprintln!("[polled]");
self.get_mut().0.as_mut().poll(cx)
}
}
async fn delayed_send(delay: u64, sender: mpsc::UnboundedSender<()>) {
tokio::time::sleep(Duration::from_millis(delay)).await;
sender.send(()).unwrap();
eprintln!("sent request");
}
#[tokio::main]
async fn main() {
let (obj, tx) = ActiveObject::new();
let ds = delayed_send(500, tx.clone());
let ds2 = delayed_send(1000, tx);
tokio::join!(obj, ds, ds2);
}
我从 运行 这个例子本地得到的输出是:
[polled]
[polled]
sent request
[polled]
received request
[polled]
sent request
[polled]
received request
所以,虽然我没有对 Context
或 Waker
做任何事情,ActiveObject
似乎以合理的速度接受轮询,也就是说,比要求的更频繁,但是不忙等待。是什么导致 ActiveObject
以这种速度被唤醒 up/polled?
您正在将相同的 Context
(因此 Waker
)传递给 new_inner_task
返回的 Future 的 poll()
方法,该方法将其向下传递到链中UnboundedReceiverStream::next()
返回的 Future
的 poll()
。该实现安排在适当的时间(当新元素出现在频道中时)在此 Waker
上调用 wake()
。完成后,Tokio 将轮询与此相关的顶级未来 Waker
- 三个未来的 join!()
。
如果您省略了轮询内部任务的行而只是返回 Poll::Pending
,您将得到预期的情况,您的 Future
将被轮询一次,然后永远“挂起”,因为没有什么会再次唤醒它。