使用 tokio 0 生成具有非静态生命周期的任务。1.x
Spawning tasks with non-static lifetimes with tokio 0.1.x
我有一个 tokio 核心,它的主要任务是 运行 一个 websocket(客户端)。当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据。下面是一个最小的失败示例:
use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;
struct Client {
handle: Handle,
data: usize,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
self.handle.spawn(future::ok(()).and_then(|x| {
self.data += 1; // error here
future::ok(())
}));
}
}
fn main() {
let mut runtime = Core::new().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: 0,
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.run(task).unwrap();
}
产生此错误的原因:
error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
--> src/main.rs:13:21
|
13 | self.handle.spawn(future::ok(()).and_then(|x| {
| ^^^^^
|
= note: type must satisfy the static lifetime
问题是通过句柄产生的新任务需要是静态的。 here 描述了相同的问题。可悲的是,我不清楚如何解决这个问题。即使尝试使用 Arc
和 Mutex
(单线程应用程序确实不需要),我也没有成功。
由于 tokio 领域的发展相当迅速,我想知道当前最好的解决方案是什么。你有什么建议吗?
编辑
Peter Hall 的解决方案适用于上述示例。可悲的是,当我构建失败的示例时,我更改了 tokio reactor,认为它们是相似的。使用 tokio::runtime::current_thread
use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
let mut data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
fn main() {
// let mut runtime = Core::new().unwrap();
let mut runtime = Builder::new().build().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: Rc::new(Cell::new(1)),
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.block_on(task).unwrap();
}
我获得:
error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21
|
17 | self.handle.spawn(future::ok(()).and_then(move |_x| {
| ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
所以在这种情况下我似乎确实需要一个 Arc
和一个 Mutex
,即使整个代码都是单线程的?
在单线程程序中,不需要使用Arc
; Rc
就足够了:
use std::{rc::Rc, cell::Cell};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
关键是您不再需要担心生命周期,因为 Rc
的每个克隆都表现得好像它拥有数据,而不是通过对 self
的引用来访问它。需要内部 Cell
(或 RefCell
用于非 Copy
类型),因为 Rc
不能可变地取消引用,因为它已被克隆。
tokio::runtime::current_thread::Handle
的 spawn
方法要求未来是 Send
,这就是导致问题更新的原因。在 this Tokio Github issue.
中有一个(某种程度上的)解释为什么会出现这种情况
你可以使用tokio::runtime::current_thread::spawn
代替Handle
的方法,它总是运行当前线程的未来,不是[=51] =]要求未来是Send
。您可以替换上面代码中的 self.handle.spawn
,它将正常工作。
如果您需要使用 Handle
上的方法,那么您还需要求助于 Arc
和 Mutex
(或 RwLock
)以满足Send
要求:
use std::sync::{Mutex, Arc};
struct Client {
handle: Handle,
data: Arc<Mutex<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Arc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
*data.lock().unwrap() += 1;
future::ok(())
}));
}
}
如果您的数据确实是 usize
,您也可以使用 AtomicUsize
而不是 Mutex<usize>
,但我个人认为它使用起来同样笨拙。
我有一个 tokio 核心,它的主要任务是 运行 一个 websocket(客户端)。当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据。下面是一个最小的失败示例:
use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;
struct Client {
handle: Handle,
data: usize,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
self.handle.spawn(future::ok(()).and_then(|x| {
self.data += 1; // error here
future::ok(())
}));
}
}
fn main() {
let mut runtime = Core::new().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: 0,
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.run(task).unwrap();
}
产生此错误的原因:
error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
--> src/main.rs:13:21
|
13 | self.handle.spawn(future::ok(()).and_then(|x| {
| ^^^^^
|
= note: type must satisfy the static lifetime
问题是通过句柄产生的新任务需要是静态的。 here 描述了相同的问题。可悲的是,我不清楚如何解决这个问题。即使尝试使用 Arc
和 Mutex
(单线程应用程序确实不需要),我也没有成功。
由于 tokio 领域的发展相当迅速,我想知道当前最好的解决方案是什么。你有什么建议吗?
编辑
Peter Hall 的解决方案适用于上述示例。可悲的是,当我构建失败的示例时,我更改了 tokio reactor,认为它们是相似的。使用 tokio::runtime::current_thread
use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
let mut data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
fn main() {
// let mut runtime = Core::new().unwrap();
let mut runtime = Builder::new().build().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: Rc::new(Cell::new(1)),
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.block_on(task).unwrap();
}
我获得:
error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21
|
17 | self.handle.spawn(future::ok(()).and_then(move |_x| {
| ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
所以在这种情况下我似乎确实需要一个 Arc
和一个 Mutex
,即使整个代码都是单线程的?
在单线程程序中,不需要使用Arc
; Rc
就足够了:
use std::{rc::Rc, cell::Cell};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
关键是您不再需要担心生命周期,因为 Rc
的每个克隆都表现得好像它拥有数据,而不是通过对 self
的引用来访问它。需要内部 Cell
(或 RefCell
用于非 Copy
类型),因为 Rc
不能可变地取消引用,因为它已被克隆。
tokio::runtime::current_thread::Handle
的 spawn
方法要求未来是 Send
,这就是导致问题更新的原因。在 this Tokio Github issue.
你可以使用tokio::runtime::current_thread::spawn
代替Handle
的方法,它总是运行当前线程的未来,不是[=51] =]要求未来是Send
。您可以替换上面代码中的 self.handle.spawn
,它将正常工作。
如果您需要使用 Handle
上的方法,那么您还需要求助于 Arc
和 Mutex
(或 RwLock
)以满足Send
要求:
use std::sync::{Mutex, Arc};
struct Client {
handle: Handle,
data: Arc<Mutex<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Arc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
*data.lock().unwrap() += 1;
future::ok(())
}));
}
}
如果您的数据确实是 usize
,您也可以使用 AtomicUsize
而不是 Mutex<usize>
,但我个人认为它使用起来同样笨拙。