为什么异步 TcpStream 会阻塞?
Why is async TcpStream blocking?
我正在做一个用 Rust 实现分布式键值存储的项目。我使用 Tokio 的异步运行时编写了服务器端代码。我 运行 遇到了一个问题,似乎我的异步代码正在阻塞,因此当我与服务器建立多个连接时,只处理一个 TcpStream。我是一般和生锈的 async
代码的新手,但我认为如果给定的 tcp 流上没有 activity,其他流将被接受和处理。
是我对 async 的理解有误还是我对 tokio 的使用不正确?
这是我的切入点:
use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;
extern crate blue;
use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opt = args::Opt::from_args();
let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
let role = NodeRole::from_str(opt.role.as_str()).unwrap();
let leader_addr = match role {
NodeRole::Leader => addr,
NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
};
let wal_name = addr.to_string().replace(".", "").replace(":", "");
let wal_full_name = format!("wal{}.log", wal_name);
let wal_path = PathBuf::from(wal_full_name);
let mut wal = match wal_path.exists() {
true => {
info!("Existing WAL found");
WriteAheadLog::open(&wal_path)?
}
false => {
info!("Creating WAL");
WriteAheadLog::new(&wal_path)?
}
};
debug!("WAL: {:?}", wal);
let store_name = addr.to_string().replace(".", "").replace(":", "");
let store_pth = format!("{}.pb", store_name);
let store_path = Path::new(&store_pth);
let mut store = match store_path.exists() {
true => deserialize_store(store_path)?,
false => message::Store::default(),
};
let listener = TcpListener::bind(addr).await?;
let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;
let store_path = Arc::new(store_path);
let store = Arc::new(Mutex::new(store));
let wal = Arc::new(Mutex::new(wal));
let cluster = Arc::new(Mutex::new(cluster));
info!("Blue launched. Waiting for incoming connection");
loop {
let (stream, addr) = listener.accept().await?;
info!("Incoming request from {}", addr);
let store = Arc::clone(&store);
let store_path = Arc::clone(&store_path);
let wal = Arc::clone(&wal);
let cluster = Arc::clone(&cluster);
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
}
}
下面是我的处理程序(handle_stream
来自上面)。我排除了 match input
中的所有处理程序,因为我认为它们没有必要证明这一点(该部分的完整代码在这里:https://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/src/store/handler.rs 如果它确实有帮助)。
具体来说,阻塞点是线 let input = async_read_message::<message::Request>(&mut stream).await;
服务器在此处等待来自客户端或群集中另一台服务器的通信。我目前看到的行为是,在使用客户端连接到服务器后,服务器没有收到任何将其他节点添加到集群的请求——它只处理客户端流。
use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;
use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;
// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
mut stream: asyncTcpStream,
store: Arc<Mutex<message::Store>>,
store_path: Arc<&Path>,
wal: Arc<Mutex<WriteAheadLog<'a>>>,
cluster: Arc<Mutex<Cluster>>,
role: &NodeRole,
) -> io::Result<()> {
loop {
info!("Handling stream: {:?}", stream);
let input = async_read_message::<message::Request>(&mut stream).await;
debug!("Input: {:?}", input);
match input {
...
}
}
}
这是 async_read_message
的代码
pub async fn async_read_message<M: Message + Default>(
stream: &mut asyncTcpStream,
) -> io::Result<M> {
let mut len_buf = [0u8; 4];
debug!("Reading message length");
stream.read_exact(&mut len_buf).await?;
let len = i32::from_le_bytes(len_buf);
let mut buf = vec![0u8; len as usize];
debug!("Reading message");
stream.read_exact(&mut buf).await?;
let user_input = M::decode(&mut buf.as_slice())?;
debug!("Received message: {:?}", user_input);
Ok(user_input)
}
您的问题在于您在客户端连接后如何处理消息:
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
这个 .await
意味着你的监听循环将等待 handle_stream
到 return,但是(做出一些假设)这个函数不会 return 直到客户端有断开连接。你想要的是tokio::spawn
一个可以运行独立完成的新任务:
tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));
您可能需要更改某些参数类型以避免生命周期; tokio::spawn
需要 'static
,因为任务的生命周期与其产生的范围分离。
我正在做一个用 Rust 实现分布式键值存储的项目。我使用 Tokio 的异步运行时编写了服务器端代码。我 运行 遇到了一个问题,似乎我的异步代码正在阻塞,因此当我与服务器建立多个连接时,只处理一个 TcpStream。我是一般和生锈的 async
代码的新手,但我认为如果给定的 tcp 流上没有 activity,其他流将被接受和处理。
是我对 async 的理解有误还是我对 tokio 的使用不正确?
这是我的切入点:
use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;
extern crate blue;
use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opt = args::Opt::from_args();
let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
let role = NodeRole::from_str(opt.role.as_str()).unwrap();
let leader_addr = match role {
NodeRole::Leader => addr,
NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
};
let wal_name = addr.to_string().replace(".", "").replace(":", "");
let wal_full_name = format!("wal{}.log", wal_name);
let wal_path = PathBuf::from(wal_full_name);
let mut wal = match wal_path.exists() {
true => {
info!("Existing WAL found");
WriteAheadLog::open(&wal_path)?
}
false => {
info!("Creating WAL");
WriteAheadLog::new(&wal_path)?
}
};
debug!("WAL: {:?}", wal);
let store_name = addr.to_string().replace(".", "").replace(":", "");
let store_pth = format!("{}.pb", store_name);
let store_path = Path::new(&store_pth);
let mut store = match store_path.exists() {
true => deserialize_store(store_path)?,
false => message::Store::default(),
};
let listener = TcpListener::bind(addr).await?;
let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;
let store_path = Arc::new(store_path);
let store = Arc::new(Mutex::new(store));
let wal = Arc::new(Mutex::new(wal));
let cluster = Arc::new(Mutex::new(cluster));
info!("Blue launched. Waiting for incoming connection");
loop {
let (stream, addr) = listener.accept().await?;
info!("Incoming request from {}", addr);
let store = Arc::clone(&store);
let store_path = Arc::clone(&store_path);
let wal = Arc::clone(&wal);
let cluster = Arc::clone(&cluster);
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
}
}
下面是我的处理程序(handle_stream
来自上面)。我排除了 match input
中的所有处理程序,因为我认为它们没有必要证明这一点(该部分的完整代码在这里:https://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/src/store/handler.rs 如果它确实有帮助)。
具体来说,阻塞点是线 let input = async_read_message::<message::Request>(&mut stream).await;
服务器在此处等待来自客户端或群集中另一台服务器的通信。我目前看到的行为是,在使用客户端连接到服务器后,服务器没有收到任何将其他节点添加到集群的请求——它只处理客户端流。
use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;
use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;
// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
mut stream: asyncTcpStream,
store: Arc<Mutex<message::Store>>,
store_path: Arc<&Path>,
wal: Arc<Mutex<WriteAheadLog<'a>>>,
cluster: Arc<Mutex<Cluster>>,
role: &NodeRole,
) -> io::Result<()> {
loop {
info!("Handling stream: {:?}", stream);
let input = async_read_message::<message::Request>(&mut stream).await;
debug!("Input: {:?}", input);
match input {
...
}
}
}
这是 async_read_message
pub async fn async_read_message<M: Message + Default>(
stream: &mut asyncTcpStream,
) -> io::Result<M> {
let mut len_buf = [0u8; 4];
debug!("Reading message length");
stream.read_exact(&mut len_buf).await?;
let len = i32::from_le_bytes(len_buf);
let mut buf = vec![0u8; len as usize];
debug!("Reading message");
stream.read_exact(&mut buf).await?;
let user_input = M::decode(&mut buf.as_slice())?;
debug!("Received message: {:?}", user_input);
Ok(user_input)
}
您的问题在于您在客户端连接后如何处理消息:
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
这个 .await
意味着你的监听循环将等待 handle_stream
到 return,但是(做出一些假设)这个函数不会 return 直到客户端有断开连接。你想要的是tokio::spawn
一个可以运行独立完成的新任务:
tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));
您可能需要更改某些参数类型以避免生命周期; tokio::spawn
需要 'static
,因为任务的生命周期与其产生的范围分离。