为什么 Tokio return 错误 "Cannot drop a runtime in a context where blocking is not allowed"?

Why does Tokio return the error "Cannot drop a runtime in a context where blocking is not allowed"?

我有一个与远程服务器通信的 Tokio 客户端,应该永久保持连接。我已经实施了初始身份验证握手,并发现当我的测试终止时,我得到了一个奇怪的恐慌:

---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21

对于可能导致这种情况的原因,我完全不知所措,所以我不知道要为上下文添加哪些其他代码。

这是我的最小可重现示例 (playground):

use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;

#[derive(PartialEq, Debug)]
pub struct Configuration {
    /// Database username.
    username: String,
    /// Database password.
    password: String,
    /// Database name.
    db_name: String,
    /// IP address for the remove server.
    address: IpAddr,
    /// Remote server port.
    port: u16,
    /// Number of connections to open.
    connections: u16,
}

impl Configuration {
    pub fn new(
        username: &str,
        password: &str,
        db_name: &str,
        address: &str,
        port: u16,
        connections: u16,
    ) -> Result<Configuration, Box<dyn std::error::Error>> {
        let address = address.to_string().parse()?;
        let configuration = Configuration {
            username: username.to_string(),
            password: password.to_string(),
            db_name: db_name.to_string(),
            address,
            port,
            connections,
        };
        Ok(configuration)
    }

    pub fn socket(&self) -> SocketAddr {
        SocketAddr::new(self.address, self.port)
    }
}

#[derive(Debug)]
pub struct Session {
    configuration: Configuration,
    runtime: RefCell<runtime::Runtime>,
}

impl Session {
    fn new(config: Configuration) -> Result<Session, io::Error> {
        let runtime = runtime::Builder::new_multi_thread()
            .worker_threads(6)
            .enable_all()
            .build()?;
        let session = Session {
            configuration: config,
            runtime: RefCell::new(runtime),
        };
        Ok(session)
    }

    fn configuration(&self) -> &Configuration {
        &self.configuration
    }
}

#[derive(Debug)]
pub(crate) struct Connection<'a> {
    /// Socket uses to read and write from.
    session: &'a Session,
    /// Connection to the remote server.
    stream: TcpStream,
    /// Flag that indicates whether the connection is live.
    live: bool,
}

impl<'a> Connection<'a> {
    async fn new(session: &Session) -> Result<Connection<'_>, Box<dyn std::error::Error>> {
        let mut stream = TcpStream::connect(session.configuration().socket()).await?;
        let conn = Connection {
            session,
            stream,
            live: true,
        };

        Ok(conn)
    }

    fn live(&self) -> bool {
        self.live
    }
}

#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
    let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
    let session = Session::new(config).unwrap();
    let conn = Connection::new(&session).await?;
    assert!(conn.live());
    Ok(())
}

fn main() {
    println!("{}", 65u8 as char)
}

如错误消息所述:

This happens when a runtime is dropped from within an asynchronous context

您已创建嵌套运行时:

  1. 来自tokio::test
  2. 来自runtime::Builder::new_multi_thread

第二个运行时由 Session 拥有,它在 异步 测试结束时被删除。您可以通过使用 mem::forget:

跳过析构函数来观察这一点
#[tokio::test]
async fn test_connect_without_database() -> Result<(), Box<dyn std::error::Error>> {
    let config = Configuration::new("rust", "", "", "127.0.0.1", 2345, 2).unwrap();
    let session = Session::new(config).unwrap();
    // Note that the assert was removed! 
    std::mem::forget(session);

    Ok(())
}

不要生成嵌套的运行时,也不要从一个运行时中删除另一个运行时。

另请参阅: