为什么我的套接字读取锁定了数据的写入?
Why does my socket read lock the write of the data?
我使用 tokio::net::TcpStream 连接一个小型 TCP 服务器,我写了几个字节并期望从服务器读取响应。
当我使用 nc 命令执行此操作时,它工作得很好
[denis@docker-1 ~]$ echo "get" | nc 10.0.0.11 9090
[37e64dd7-91db-4c13-9f89-f1c87467ffb3][processed]
并且服务器日志显示
Incoming peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [4]
Got a few bytes [[103, 101, 116, 10, 0, ...]]
Reading bytes...
Got a few bytes [0]
Got a few bytes [[0, 0, 0, 0, 0, 0,...]]
Writing some data back from peer : [37e64dd7-91db-4c13-9f89-f1c87467ffb3]
但是从我的 Rust 客户端,我可以写入字节,但是一旦我想从服务器读取数据,一切都被锁定(甚至写入操作)
use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;
#[tokio::main]
async fn main() {
let data = "set".to_string();
let mut stream = TcpStream::connect("10.0.0.11:9090").await.unwrap();
let ( mut read, mut write) = tokio::io::split(stream);
let u2 = data.as_bytes();
write.write_all(u2).await.unwrap();
let mut msg : [u8;1024] = [0;1024];
let _response_size = read.read(&mut msg).await.unwrap();
println!("GOT = {:?}", msg);
}
查看服务器日志(见下文)时,它读取了客户端发送的 3 个字节,但无法继续读取,等待检测是否还有 0 个字节要读取。
Incoming peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [3]
Got a few bytes [[115, 101, 116, 0, 0, ...]]
Reading bytes...
这是服务器代码
use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;
struct DataPool {
data : [u8;1024],
size : usize,
}
async fn whirl_socket( socket : &mut TcpStream ) -> Vec<DataPool> {
let mut pool: Vec<DataPool> = vec![];
let mut buf = [0; 1024];
// In a loop, read data from the socket until finished
loop {
println!("Reading bytes...");
buf = [0; 1024];
let n = match socket.read(&mut buf).await {
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
break;
}
};
println!("Got a few bytes [{}]", n);
println!("Got a few bytes [{:?}]", &buf);
pool.push(DataPool {
data: buf,
size: n,
});
if n == 0 {
break;
}
}
pool
}
async fn launch_server_listener() -> io::Result<()> {
println!("Listen to 9090...");
let listener = TcpListener::bind("10.0.0.11:9090").await?;
loop {
println!("Waiting for peer instructions...");
let (mut socket, _) = listener.accept().await?;
println!("Incoming peer instructions.");
tokio::spawn(async move {
let mut pool= whirl_socket(&mut socket).await;
let my_uuid = Uuid::new_v4();
// Write the data back
println!("Writing some data back from peer : [{}]", my_uuid);
let s = format!( "[{}][processed]\n", my_uuid.to_string());
let u = s.as_bytes();
if let Err(e) = socket.write_all(u).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
}
});
}
}
async fn start_servers() -> Result<(), Box<dyn std::error::Error>> {
let _r = tokio::join!(launch_server_listener());
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_servers().await?;
Ok(())
}
读取 0
字节意味着读取流已关闭。因此,在您的客户端代码中,您需要关闭写入流。您可以使用 AsyncWriteExt
特征中的 .shutdown()
执行此操作:
write.write_all(u2).await.unwrap();
write.shutdown().await.unwrap();
我使用 tokio::net::TcpStream 连接一个小型 TCP 服务器,我写了几个字节并期望从服务器读取响应。
当我使用 nc 命令执行此操作时,它工作得很好
[denis@docker-1 ~]$ echo "get" | nc 10.0.0.11 9090
[37e64dd7-91db-4c13-9f89-f1c87467ffb3][processed]
并且服务器日志显示
Incoming peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [4]
Got a few bytes [[103, 101, 116, 10, 0, ...]]
Reading bytes...
Got a few bytes [0]
Got a few bytes [[0, 0, 0, 0, 0, 0,...]]
Writing some data back from peer : [37e64dd7-91db-4c13-9f89-f1c87467ffb3]
但是从我的 Rust 客户端,我可以写入字节,但是一旦我想从服务器读取数据,一切都被锁定(甚至写入操作)
use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;
#[tokio::main]
async fn main() {
let data = "set".to_string();
let mut stream = TcpStream::connect("10.0.0.11:9090").await.unwrap();
let ( mut read, mut write) = tokio::io::split(stream);
let u2 = data.as_bytes();
write.write_all(u2).await.unwrap();
let mut msg : [u8;1024] = [0;1024];
let _response_size = read.read(&mut msg).await.unwrap();
println!("GOT = {:?}", msg);
}
查看服务器日志(见下文)时,它读取了客户端发送的 3 个字节,但无法继续读取,等待检测是否还有 0 个字节要读取。
Incoming peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [3]
Got a few bytes [[115, 101, 116, 0, 0, ...]]
Reading bytes...
这是服务器代码
use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;
struct DataPool {
data : [u8;1024],
size : usize,
}
async fn whirl_socket( socket : &mut TcpStream ) -> Vec<DataPool> {
let mut pool: Vec<DataPool> = vec![];
let mut buf = [0; 1024];
// In a loop, read data from the socket until finished
loop {
println!("Reading bytes...");
buf = [0; 1024];
let n = match socket.read(&mut buf).await {
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
break;
}
};
println!("Got a few bytes [{}]", n);
println!("Got a few bytes [{:?}]", &buf);
pool.push(DataPool {
data: buf,
size: n,
});
if n == 0 {
break;
}
}
pool
}
async fn launch_server_listener() -> io::Result<()> {
println!("Listen to 9090...");
let listener = TcpListener::bind("10.0.0.11:9090").await?;
loop {
println!("Waiting for peer instructions...");
let (mut socket, _) = listener.accept().await?;
println!("Incoming peer instructions.");
tokio::spawn(async move {
let mut pool= whirl_socket(&mut socket).await;
let my_uuid = Uuid::new_v4();
// Write the data back
println!("Writing some data back from peer : [{}]", my_uuid);
let s = format!( "[{}][processed]\n", my_uuid.to_string());
let u = s.as_bytes();
if let Err(e) = socket.write_all(u).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
}
});
}
}
async fn start_servers() -> Result<(), Box<dyn std::error::Error>> {
let _r = tokio::join!(launch_server_listener());
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_servers().await?;
Ok(())
}
读取 0
字节意味着读取流已关闭。因此,在您的客户端代码中,您需要关闭写入流。您可以使用 AsyncWriteExt
特征中的 .shutdown()
执行此操作:
write.write_all(u2).await.unwrap();
write.shutdown().await.unwrap();