如何处理 tokio::spawn 需要'static 和 &self 的闭包?
How to deal with tokio::spawn closure required to be 'static and &self?
我无法理解如何编写封装在单一结构中的并发异步代码。
我不确定如何准确地解释这个问题,所以我会尝试用一个例子来解释。
假设我有一个 UdpServer
结构。此结构有多个与其行为相关的方法(例如,handle_datagram
、deserialize_datagram
等)
如果我想让代码并发,我将生成 tokio 任务,它要求提供给它的闭包是静态的,这意味着只要 &self
是,我就不能从这个任务中调用 &self
不是静态的,这意味着我不能调用 self.serialize_datagram()
.
我理解这个问题(不能保证结构会比线程长寿),但看不到解决它的正确方法。我知道可以将函数移出 impl,但这对我来说不是一个好的解决方案。
此外,即使我们暂时假设我 可以 将 &self
视为静态的,出于某种原因,这段代码在我看来仍然不正确(不够生锈,我猜猜)。
另一个“解决方案”是取 self: Arc<Self>
而不是 &self
,但这感觉更糟。
所以我假设有一些我不知道的模式。
有人可以向我解释一下我应该如何重构整个事情吗?
示例代码:
struct UdpServer {}
impl UdpServer {
pub async fn run(&self) {
let socket = UdpSocket::bind(self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
// I spawn tokio task to enable concurrency
tokio::spawn(async move {
// But i can't use &self in here because it's not static.
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(()).await;
});
}
}
pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, datagram: Datagram) {
unimplemented!()
}
}
目前唯一的方法是通过使用Arc
使self
持续任意长。由于 run()
是 UdpServer
上的一种方法,它需要更改为 Arc<Self>
,您考虑过但拒绝了,因为感觉更糟。不过,这就是这样做的方式:
pub async fn run(self: Arc<Self>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
tokio::spawn({
let me = Arc::clone(&self);
async move {
let datagram = me.deserialize_datagram(buf).await;
me.handle_datagram(datagram).await;
}
});
}
}
有趣的是,smol async runtime 可能实际上提供了您正在寻找的东西,因为它的执行者拥有一生。该生命周期与来自调用者环境的值相关联,并且在执行者上产生的未来可能会引用它。例如,这样编译:
use futures_lite::future;
use smol::{Executor, net::UdpSocket};
struct Datagram;
struct UdpServer {
addr: String,
}
impl UdpServer {
pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
ex.spawn({
async move {
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(datagram).await;
}
}).detach();
}
}
pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, _datagram: Datagram) {
unimplemented!()
}
}
fn main() {
let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
let ex = Executor::new();
future::block_on(server.run(&ex));
}
你完全正确。 tokio tutorial 提到了这个解决方案:
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.
我无法理解如何编写封装在单一结构中的并发异步代码。
我不确定如何准确地解释这个问题,所以我会尝试用一个例子来解释。
假设我有一个 UdpServer
结构。此结构有多个与其行为相关的方法(例如,handle_datagram
、deserialize_datagram
等)
如果我想让代码并发,我将生成 tokio 任务,它要求提供给它的闭包是静态的,这意味着只要 &self
是,我就不能从这个任务中调用 &self
不是静态的,这意味着我不能调用 self.serialize_datagram()
.
我理解这个问题(不能保证结构会比线程长寿),但看不到解决它的正确方法。我知道可以将函数移出 impl,但这对我来说不是一个好的解决方案。
此外,即使我们暂时假设我 可以 将 &self
视为静态的,出于某种原因,这段代码在我看来仍然不正确(不够生锈,我猜猜)。
另一个“解决方案”是取 self: Arc<Self>
而不是 &self
,但这感觉更糟。
所以我假设有一些我不知道的模式。 有人可以向我解释一下我应该如何重构整个事情吗?
示例代码:
struct UdpServer {}
impl UdpServer {
pub async fn run(&self) {
let socket = UdpSocket::bind(self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
// I spawn tokio task to enable concurrency
tokio::spawn(async move {
// But i can't use &self in here because it's not static.
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(()).await;
});
}
}
pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, datagram: Datagram) {
unimplemented!()
}
}
目前唯一的方法是通过使用Arc
使self
持续任意长。由于 run()
是 UdpServer
上的一种方法,它需要更改为 Arc<Self>
,您考虑过但拒绝了,因为感觉更糟。不过,这就是这样做的方式:
pub async fn run(self: Arc<Self>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
tokio::spawn({
let me = Arc::clone(&self);
async move {
let datagram = me.deserialize_datagram(buf).await;
me.handle_datagram(datagram).await;
}
});
}
}
有趣的是,smol async runtime 可能实际上提供了您正在寻找的东西,因为它的执行者拥有一生。该生命周期与来自调用者环境的值相关联,并且在执行者上产生的未来可能会引用它。例如,这样编译:
use futures_lite::future;
use smol::{Executor, net::UdpSocket};
struct Datagram;
struct UdpServer {
addr: String,
}
impl UdpServer {
pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
let socket = UdpSocket::bind(&self.addr).await.unwrap();
loop {
let mut buf: &mut [u8] = &mut [];
let (_, _) = socket.recv_from(&mut buf).await.unwrap();
ex.spawn({
async move {
let datagram = self.deserialize_datagram(buf).await;
self.handle_datagram(datagram).await;
}
}).detach();
}
}
pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
unimplemented!()
}
pub async fn handle_datagram(&self, _datagram: Datagram) {
unimplemented!()
}
}
fn main() {
let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
let ex = Executor::new();
future::block_on(server.run(&ex));
}
你完全正确。 tokio tutorial 提到了这个解决方案:
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.