为什么异步 TcpStream 会阻塞?

Why is async TcpStream blocking?

我正在做一个用 Rust 实现分布式键值存储的项目。我使用 Tokio 的异步运行时编写了服务器端代码。我 运行 遇到了一个问题,似乎我的异步代码正在阻塞,因此当我与服务器建立多个连接时,只处理一个 TcpStream。我是一般和生锈的 async 代码的新手,但我认为如果给定的 tcp 流上没有 activity,其他流将被接受和处理。

是我对 async 的理解有误还是我对 tokio 的使用不正确?

这是我的切入点:

use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;

extern crate blue;

use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opt = args::Opt::from_args();
    let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
    let role = NodeRole::from_str(opt.role.as_str()).unwrap();
    let leader_addr = match role {
        NodeRole::Leader => addr,
        NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
    };

    let wal_name = addr.to_string().replace(".", "").replace(":", "");
    let wal_full_name = format!("wal{}.log", wal_name);
    let wal_path = PathBuf::from(wal_full_name);
    let mut wal = match wal_path.exists() {
        true => {
            info!("Existing WAL found");
            WriteAheadLog::open(&wal_path)?
        }
        false => {
            info!("Creating WAL");
            WriteAheadLog::new(&wal_path)?
        }
    };
    debug!("WAL: {:?}", wal);

    let store_name = addr.to_string().replace(".", "").replace(":", "");
    let store_pth = format!("{}.pb", store_name);
    let store_path = Path::new(&store_pth);
    let mut store = match store_path.exists() {
        true => deserialize_store(store_path)?,
        false => message::Store::default(),
    };

    let listener = TcpListener::bind(addr).await?;
    let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;

    let store_path = Arc::new(store_path);
    let store = Arc::new(Mutex::new(store));

    let wal = Arc::new(Mutex::new(wal));
    let cluster = Arc::new(Mutex::new(cluster));
    info!("Blue launched. Waiting for incoming connection");

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Incoming request from {}", addr);
        let store = Arc::clone(&store);
        let store_path = Arc::clone(&store_path);
        let wal = Arc::clone(&wal);
        let cluster = Arc::clone(&cluster);
        handle_stream(stream, store, store_path, wal, cluster, &role).await?;
    }
}

下面是我的处理程序(handle_stream 来自上面)。我排除了 match input 中的所有处理程序,因为我认为它们没有必要证明这一点(该部分的完整代码在这里:https://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/src/store/handler.rs 如果它确实有帮助)。

具体来说,阻塞点是线 let input = async_read_message::<message::Request>(&mut stream).await;

服务器在此处等待来自客户端或群集中另一台服务器的通信。我目前看到的行为是,在使用客户端连接到服务器后,服务器没有收到任何将其他节点添加到集群的请求——它只处理客户端流。

use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;

use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;

// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
    mut stream: asyncTcpStream,
    store: Arc<Mutex<message::Store>>,
    store_path: Arc<&Path>,
    wal: Arc<Mutex<WriteAheadLog<'a>>>,
    cluster: Arc<Mutex<Cluster>>,
    role: &NodeRole,
) -> io::Result<()> {
    loop {
        info!("Handling stream: {:?}", stream);
        let input = async_read_message::<message::Request>(&mut stream).await;
        debug!("Input: {:?}", input);
        match input {
        ...
        }
    }
}

这是 async_read_message

的代码
pub async fn async_read_message<M: Message + Default>(
    stream: &mut asyncTcpStream,
) -> io::Result<M> {
    let mut len_buf = [0u8; 4];
    debug!("Reading message length");
    stream.read_exact(&mut len_buf).await?;
    let len = i32::from_le_bytes(len_buf);
    let mut buf = vec![0u8; len as usize];
    debug!("Reading message");
    stream.read_exact(&mut buf).await?;
    let user_input = M::decode(&mut buf.as_slice())?;
    debug!("Received message: {:?}", user_input);
    Ok(user_input)
}

您的问题在于您在客户端连接后如何处理消息:

handle_stream(stream, store, store_path, wal, cluster, &role).await?;

这个 .await 意味着你的监听循环将等待 handle_stream 到 return,但是(做出一些假设)这个函数不会 return 直到客户端有断开连接。你想要的是tokio::spawn一个可以运行独立完成的新任务:

tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));

您可能需要更改某些参数类型以避免生命周期; tokio::spawn 需要 'static,因为任务的生命周期与其产生的范围分离。