如何处理 tokio::spawn 需要'static 和 &self 的闭包?

How to deal with tokio::spawn closure required to be 'static and &self?

我无法理解如何编写封装在单一结构中的并发异步代码。

我不确定如何准确地解释这个问题,所以我会尝试用一个例子来解释。

假设我有一个 UdpServer 结构。此结构有多个与其行为相关的方法(例如,handle_datagramdeserialize_datagram 等)
如果我想让代码并发,我将生成 tokio 任务,它要求提供给它的闭包是静态的,这意味着只要 &self 是,我就不能从这个任务中调用 &self不是静态的,这意味着我不能调用 self.serialize_datagram().

我理解这个问题(不能保证结构会比线程长寿),但看不到解决它的正确方法。我知道可以将函数移出 impl,但这对我来说不是一个好的解决方案。
此外,即使我们暂时假设我 可以 &self 视为静态的,出于某种原因,这段代码在我看来仍然不正确(不够生锈,我猜猜)。
另一个“解决方案”是取 self: Arc<Self> 而不是 &self,但这感觉更糟。

所以我假设有一些我不知道的模式。 有人可以向我解释一下我应该如何重构整个事情吗?

示例代码:

struct UdpServer {}
impl UdpServer {
    pub async fn run(&self) {
        let socket = UdpSocket::bind(self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            // I spawn tokio task to enable concurrency
            tokio::spawn(async move {
                // But i can't use &self in here because it's not static.
                let datagram = self.deserialize_datagram(buf).await;
                self.handle_datagram(()).await;
            });
        }
    }

    pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, datagram: Datagram) {
        unimplemented!()
    }
}

目前唯一的方法是通过使用Arc使self持续任意长。由于 run()UdpServer 上的一种方法,它需要更改为 Arc<Self>,您考虑过但拒绝了,因为感觉更糟。不过,这就是这样做的方式:

pub async fn run(self: Arc<Self>) {
    let socket = UdpSocket::bind(&self.addr).await.unwrap();
    loop {
        let mut buf: &mut [u8] = &mut [];
        let (_, _) = socket.recv_from(&mut buf).await.unwrap();

        tokio::spawn({
            let me = Arc::clone(&self);
            async move {
                let datagram = me.deserialize_datagram(buf).await;
                me.handle_datagram(datagram).await;
            }
        });
    }
}

Playground

有趣的是,smol async runtime 可能实际上提供了您正在寻找的东西,因为它的执行者拥有一生。该生命周期与来自调用者环境的值相关联,并且在执行者上产生的未来可能会引用它。例如,这样编译:

use futures_lite::future;
use smol::{Executor, net::UdpSocket};

struct Datagram;

struct UdpServer {
    addr: String,
}

impl UdpServer {
    pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
        let socket = UdpSocket::bind(&self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            ex.spawn({
                async move {
                    let datagram = self.deserialize_datagram(buf).await;
                    self.handle_datagram(datagram).await;
                }
            }).detach();
        }
    }

    pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, _datagram: Datagram) {
        unimplemented!()
    }
}

fn main() {
    let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
    let ex = Executor::new();
    future::block_on(server.run(&ex));
}

你完全正确。 tokio tutorial 提到了这个解决方案:

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.