实践中实现Future时如何使用Context和Wakers

How to use Context and Wakers when implementing Future in practice

我发现很难理解为什么以及何时我需要明确地对 Context and/or 做一些事情,它的 Waker 传递给 poll 方法我正在为其实施 Future 的对象。我一直在阅读 Tokio and the Async Book 的文档,但我觉得 examples/methods 过于抽象,无法应用于实际问题。

例如,我原以为下面的 MRE 会死锁,因为 new_inner_task 生成的 future 不知道何时在 MPSC 通道上传递了消息,但是,这个例子似乎工作正常。为什么会这样?

use std::{future::Future, pin::Pin, task::{Context, Poll}, time::Duration};
use futures::{FutureExt, StreamExt}; // 0.3
use tokio::sync::mpsc; // 1.2
use tokio_stream::wrappers::UnboundedReceiverStream; // 0.1

async fn new_inner_task(rx: mpsc::UnboundedReceiver<()>) {
    let mut requests = UnboundedReceiverStream::new(rx);
    while let Some(_) = requests.next().await {
        eprintln!("received request");
    }
}

pub struct ActiveObject(Pin<Box<dyn Future<Output = ()> + Send>>);

impl ActiveObject {
    pub fn new() -> (Self, mpsc::UnboundedSender<()>) {
        let (tx, rx) = mpsc::unbounded_channel();
        (Self(new_inner_task(rx).boxed()), tx)
    }
}

impl Future for ActiveObject {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        eprintln!("[polled]");
        self.get_mut().0.as_mut().poll(cx)
    }
}

async fn delayed_send(delay: u64, sender: mpsc::UnboundedSender<()>) {
    tokio::time::sleep(Duration::from_millis(delay)).await;
    sender.send(()).unwrap();
    eprintln!("sent request");
}

#[tokio::main]
async fn main() {
    let (obj, tx) = ActiveObject::new();
    let ds = delayed_send(500, tx.clone());
    let ds2 = delayed_send(1000, tx);
    
    tokio::join!(obj, ds, ds2);
}

我从 运行 这个例子本地得到的输出是:

[polled]
[polled]
sent request
[polled]
received request
[polled]
sent request
[polled]
received request

所以,虽然我没有对 ContextWaker 做任何事情,ActiveObject 似乎以合理的速度接受轮询,也就是说,比要求的更频繁,但是不忙等待。是什么导致 ActiveObject 以这种速度被唤醒 up/polled?

您正在将相同的 Context(因此 Waker)传递给 new_inner_task 返回的 Future 的 poll() 方法,该方法将其向下传递到链中UnboundedReceiverStream::next() 返回的 Futurepoll()。该实现安排在适当的时间(当新元素出现在频道中时)在此 Waker 上调用 wake()。完成后,Tokio 将轮询与此相关的顶级未来 Waker - 三个未来的 join!()

如果您省略了轮询内部任务的行而只是返回 Poll::Pending,您将得到预期的情况,您的 Future 将被轮询一次,然后永远“挂起”,因为没有什么会再次唤醒它。