使用 Tokio 处理 TCP 连接时使用回调
Using a callback when handling TCP connections with Tokio
我正在尝试拥有一个启动事件循环、侦听 TCP 连接并为每个连接调用回调的结构。
(回调将从套接字传递一些预先拥有的数据。在我下面的示例中,我只是将连接的 IP 地址传递给它,但在我的真实代码中,我将解析我收到的内容 serde
到结构中并将其传递到回调中。我希望这不会使以下 "not working example").
无效
我的Cargo.toml
:
[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"
和main.rs
:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}
impl Test {
pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test{ printer: None };
x.start();
}
我从这段代码开始尝试了几件事(直接从 Tokio example 中采用)。
如果我使用上面发布的代码,我得到:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
第 24 行 (tokio::run(server)
).
如果我在打印机字段 Fn
上添加 Send
特征 XOR 如果我删除 move
在 for_each
调用的闭包中我得到另一个错误:
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
这让我注意到闭包显然不能比定义它的 start
方法长寿,但 tokio::run
似乎对它有冲突的要求。
你知道我是否以完全错误的方式处理回调模式,或者我的代码中是否只有一些小错误?
要事第一:
除非明确指定生命周期,否则编译器会将 Box<Fn(std::net::SocketAddr) + Sync>
转换为 Box<Fn(std::net::SocketAddr) + Sync + 'static>
。
让我们看看错误:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
这是不言自明的。您正试图将 &mut T
移动到另一个线程,但不能,因为这里的 T
不是 Send
。要将 &mut T
发送到另一个线程 T
也需要是 Send
.
类型
这是会产生相同错误的最少代码:
use std::fmt::Debug;
fn func<T> (i:&'static mut T) where T: Debug {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
如果我将上面的 T
也设为 Send
类型,错误就会消失。
但是在你的情况下,当你添加 Send
特征时,它会给出生命周期错误。为什么?
&mut self
的生命周期大于调用者设置的函数 start()
,但不能保证其 'static
。您将此引用移动到传递给线程的闭包中,并且可能会比它正在关闭的范围更长寿,从而导致悬空引用。
这是一个最小版本,它会给出同样的错误。
use std::fmt::Debug;
fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
Sync
在这里并不需要,因为它是 &mut T
。将 &mut T
更改为 &T
(保留 Sync
),也会导致相同的错误。这里的责任在于引用而不是可变性。所以你看,有一些生命周期 'a
并且它被移动到一个闭包中(给一个线程),这意味着闭包现在包含一个引用(与主上下文不相交)。那么现在,'a
是什么?从另一个线程调用的闭包的角度来看,它会存在多长时间?不可推论!结果,编译器抱怨说 cannot infer an appropriate lifetime due to conflicting requirements
.
如果我们稍微调整一下代码;
impl Test {
pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
它会编译好的。那里保证 self
有一个 'static
生命周期。请注意,在 match
语句中我们需要传递 &self.printer
,因为您不能移出借用的上下文。
但是,这需要将 Test
声明为静态的,而且也是可变的,如果您有其他选择,这通常不是最好的方法。
另一种方式是;如果可以将 Test
按值传递给 start()
,然后将其进一步移动到 for_each()
,代码将如下所示:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}
impl Test {
pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => {
callback(address);
}
None => {
println!("{}", address);
}
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test { printer: None };
x.start();
}
我正在尝试拥有一个启动事件循环、侦听 TCP 连接并为每个连接调用回调的结构。
(回调将从套接字传递一些预先拥有的数据。在我下面的示例中,我只是将连接的 IP 地址传递给它,但在我的真实代码中,我将解析我收到的内容 serde
到结构中并将其传递到回调中。我希望这不会使以下 "not working example").
我的Cargo.toml
:
[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"
和main.rs
:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}
impl Test {
pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test{ printer: None };
x.start();
}
我从这段代码开始尝试了几件事(直接从 Tokio example 中采用)。
如果我使用上面发布的代码,我得到:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
第 24 行 (
tokio::run(server)
).如果我在打印机字段
Fn
上添加Send
特征 XOR 如果我删除move
在for_each
调用的闭包中我得到另一个错误:error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
这让我注意到闭包显然不能比定义它的
start
方法长寿,但tokio::run
似乎对它有冲突的要求。
你知道我是否以完全错误的方式处理回调模式,或者我的代码中是否只有一些小错误?
要事第一:
除非明确指定生命周期,否则编译器会将 Box<Fn(std::net::SocketAddr) + Sync>
转换为 Box<Fn(std::net::SocketAddr) + Sync + 'static>
。
让我们看看错误:
error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
这是不言自明的。您正试图将 &mut T
移动到另一个线程,但不能,因为这里的 T
不是 Send
。要将 &mut T
发送到另一个线程 T
也需要是 Send
.
这是会产生相同错误的最少代码:
use std::fmt::Debug;
fn func<T> (i:&'static mut T) where T: Debug {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
如果我将上面的 T
也设为 Send
类型,错误就会消失。
但是在你的情况下,当你添加 Send
特征时,它会给出生命周期错误。为什么?
&mut self
的生命周期大于调用者设置的函数 start()
,但不能保证其 'static
。您将此引用移动到传递给线程的闭包中,并且可能会比它正在关闭的范围更长寿,从而导致悬空引用。
这是一个最小版本,它会给出同样的错误。
use std::fmt::Debug;
fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
std::thread::spawn(move || {
println!("{:?}", i);
});
}
Sync
在这里并不需要,因为它是 &mut T
。将 &mut T
更改为 &T
(保留 Sync
),也会导致相同的错误。这里的责任在于引用而不是可变性。所以你看,有一些生命周期 'a
并且它被移动到一个闭包中(给一个线程),这意味着闭包现在包含一个引用(与主上下文不相交)。那么现在,'a
是什么?从另一个线程调用的闭包的角度来看,它会存在多长时间?不可推论!结果,编译器抱怨说 cannot infer an appropriate lifetime due to conflicting requirements
.
如果我们稍微调整一下代码;
impl Test {
pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
它会编译好的。那里保证 self
有一个 'static
生命周期。请注意,在 match
语句中我们需要传递 &self.printer
,因为您不能移出借用的上下文。
但是,这需要将 Test
声明为静态的,而且也是可变的,如果您有其他选择,这通常不是最好的方法。
另一种方式是;如果可以将 Test
按值传递给 start()
,然后将其进一步移动到 for_each()
,代码将如下所示:
use tokio::prelude::*;
struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}
impl Test {
pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => {
callback(address);
}
None => {
println!("{}", address);
}
}
Ok(())
});
tokio::run(server);
Ok(())
}
}
fn main() {
let mut x = Test { printer: None };
x.start();
}