为什么我的套接字读取锁定了数据的写入?

Why does my socket read lock the write of the data?

我使用 tokio::net::TcpStream 连接一个小型 TCP 服务器,我写了几个字节并期望从服务器读取响应。

当我使用 nc 命令执行此操作时,它工作得很好

[denis@docker-1 ~]$ echo "get" | nc 10.0.0.11 9090
[37e64dd7-91db-4c13-9f89-f1c87467ffb3][processed]

并且服务器日志显示

Incoming  peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [4]
Got a few bytes [[103, 101, 116, 10, 0, ...]]
Reading bytes...
Got a few bytes [0]
Got a few bytes [[0, 0, 0, 0, 0, 0,...]]
Writing some data back from peer : [37e64dd7-91db-4c13-9f89-f1c87467ffb3]

但是从我的 Rust 客户端,我可以写入字节,但是一旦我想从服务器读取数据,一切都被锁定(甚至写入操作)

use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;

#[tokio::main]
async fn main() {
    let data = "set".to_string();
    let mut stream = TcpStream::connect("10.0.0.11:9090").await.unwrap();
    let ( mut read, mut write) = tokio::io::split(stream);

    let u2 = data.as_bytes();
    write.write_all(u2).await.unwrap();

    let mut msg : [u8;1024] = [0;1024];
    let _response_size = read.read(&mut msg).await.unwrap();
    println!("GOT = {:?}", msg);
}

查看服务器日志(见下文)时,它读取了客户端发送的 3 个字节,但无法继续读取,等待检测是否还有 0 个字节要读取。

Incoming  peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [3]
Got a few bytes [[115, 101, 116, 0, 0, ...]]
Reading bytes...

这是服务器代码

use std::collections::HashMap;
use std::ops::DerefMut;

use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;

struct DataPool {
    data : [u8;1024],
    size : usize,
}

async fn whirl_socket( socket : &mut TcpStream ) -> Vec<DataPool> {
    let mut pool: Vec<DataPool> = vec![];
    let mut buf = [0; 1024];

    // In a loop, read data from the socket until finished
    loop {
        println!("Reading bytes...");
        buf = [0; 1024];
        let n = match socket.read(&mut buf).await {
            Ok(n) => n,
            Err(e) => {
                eprintln!("failed to read from socket; err = {:?}", e);
                break;
            }
        };
        println!("Got a few bytes [{}]", n);
        println!("Got a few bytes [{:?}]", &buf);
        pool.push(DataPool {
            data: buf,
            size: n,
        });
        if n == 0 {
            break;
        }
    }
    pool
}


async fn launch_server_listener() -> io::Result<()> {

    println!("Listen to 9090...");
    let listener = TcpListener::bind("10.0.0.11:9090").await?;

    loop {
        println!("Waiting for peer instructions...");
        let (mut socket, _) = listener.accept().await?;
        println!("Incoming  peer instructions.");

        tokio::spawn(async move {
            let mut pool= whirl_socket(&mut socket).await;

            let my_uuid = Uuid::new_v4();

            // Write the data back
            println!("Writing some data back from peer : [{}]",  my_uuid);
            let s = format!( "[{}][processed]\n", my_uuid.to_string());
            let u = s.as_bytes();
            if let Err(e) = socket.write_all(u).await {
                eprintln!("failed to write to socket; err = {:?}", e);
                return;
            }

        });
    }

}


async fn start_servers() -> Result<(), Box<dyn std::error::Error>> {
    let _r = tokio::join!(launch_server_listener());
    Ok(())
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    start_servers().await?;
    Ok(())
}

读取 0 字节意味着读取流已关闭。因此,在您的客户端代码中,您需要关闭写入流。您可以使用 AsyncWriteExt 特征中的 .shutdown() 执行此操作:

write.write_all(u2).await.unwrap();
write.shutdown().await.unwrap();