使用 Tokio 处理 TCP 连接时使用回调

Using a callback when handling TCP connections with Tokio

我正在尝试拥有一个启动事件循环、侦听 TCP 连接并为每个连接调用回调的结构。

(回调将从套接字传递一些预先拥有的数据。在我下面的示例中,我只是将连接的 IP 地址传递给它,但在我的真实代码中,我将解析我收到的内容 serde 到结构中并将其传递到回调中。我希望这不会使以下 "not working example").

无效

我的Cargo.toml:

[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"

main.rs:

use tokio::prelude::*;

struct Test {
    printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}

impl Test {
    pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match self.printer {
                    Some(callback) => { callback(address); }
                    None => { println!("{}", address); }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

fn main() {
    let mut x = Test{ printer: None };
    x.start();
}

我从这段代码开始尝试了几件事(直接从 Tokio example 中采用)。

  1. 如果我使用上面发布的代码,我得到:

    error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
    

    第 24 行 (tokio::run(server)).

  2. 如果我在打印机字段 Fn 上添加 Send 特征 XOR 如果我删除 movefor_each 调用的闭包中我得到另一个错误:

    error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
    

    这让我注意到闭包显然不能比定义它的 start 方法长寿,但 tokio::run 似乎对它有冲突的要求。

你知道我是否以完全错误的方式处理回调模式,或者我的代码中是否只有一些小错误?

要事第一:

除非明确指定生命周期,否则编译器会将 Box<Fn(std::net::SocketAddr) + Sync> 转换为 Box<Fn(std::net::SocketAddr) + Sync + 'static>

让我们看看错误:

error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely

这是不言自明的。您正试图将 &mut T 移动到另一个线程,但不能,因为这里的 T 不是 Send。要将 &mut T 发送到另一个线程 T 也需要是 Send.

类型

这是会产生相同错误的最少代码:

use std::fmt::Debug;

fn func<T> (i:&'static mut T) where T: Debug {
    std::thread::spawn(move || {
        println!("{:?}", i);
    });
}

如果我将上面的 T 也设为 Send 类型,错误就会消失。 但是在你的情况下,当你添加 Send 特征时,它会给出生命周期错误。为什么?

&mut self 的生命周期大于调用者设置的函数 start(),但不能保证其 'static。您将此引用移动到传递给线程的闭包中,并且可能会比它正在关闭的范围更长寿,从而导致悬空引用。

这是一个最小版本,它会给出同样的错误。

use std::fmt::Debug;

fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
    std::thread::spawn(move || {
        println!("{:?}", i);
    });
}

Sync 在这里并不需要,因为它是 &mut T。将 &mut T 更改为 &T(保留 Sync),也会导致相同的错误。这里的责任在于引用而不是可变性。所以你看,有一些生命周期 'a 并且它被移动到一个闭包中(给一个线程),这意味着闭包现在包含一个引用(与主上下文不相交)。那么现在,'a 是什么?从另一个线程调用的闭包的角度来看,它会存在多长时间?不可推论!结果,编译器抱怨说 cannot infer an appropriate lifetime due to conflicting requirements.

如果我们稍微调整一下代码;

impl Test {
    pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match &self.printer {
                    Some(callback) => { callback(address); }
                    None => { println!("{}", address); }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

它会编译好的。那里保证 self 有一个 'static 生命周期。请注意,在 match 语句中我们需要传递 &self.printer,因为您不能移出借用的上下文。

但是,这需要将 Test 声明为静态的,而且也是可变的,如果您有其他选择,这通常不是最好的方法。

另一种方式是;如果可以将 Test 按值传递给 start(),然后将其进一步移动到 for_each(),代码将如下所示:

use tokio::prelude::*;

struct Test {
    printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}

impl Test {
    pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match &self.printer {
                    Some(callback) => {
                        callback(address);
                    }
                    None => {
                        println!("{}", address);
                    }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

fn main() {
    let mut x = Test { printer: None };
    x.start();
}