如何将 Tokio 设置为多线程 UDP 服务器?

How to set up Tokio as a multi-threaded UDP server?

到目前为止,我找到的唯一示例是来自 Tokio 存储库的单线程示例 Echo UDP。如何使用 Tokio 启动一个循环来生成一个新线程来处理新的 UDP 连接。

original version of the question

的答案

How does one set up Tokio 0.2 to “listen” for UDP data?

use tokio::net::UdpSocket; // "0.2.20", features = ["full"]

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

#[tokio::main]
async fn main() -> Result<()> {
    let mut socket = UdpSocket::bind("127.0.0.1:9999").await?;

    loop {
        let mut data = [0; 1024];
        let valid_bytes = socket.recv(&mut data).await?;
        let data = &data[..valid_bytes];

        eprintln!("Read {} bytes", data.len());
    }
}

(Tokio 1.4 的代码实际上是相同的,只是删除 mut 限定符。)

一个window:

% cargo run
Read 6 bytes
Read 5 bytes
Read 6 bytes

在另一个:

% nc -u 127.0.0.1 9999
alpha
beta
gamma

current version of the question

的答案

How to set up Tokio as a multi-threaded UDP server?

上面的代码多线程的; Tokio is multithreaded by default。您可能希望创建并发(也可能是并行)工作;这可以通过 生成任务:

来完成

要在 tokio 中创建可安排的单元,您应该使用 tokio::task::spawn。如果底层运行时是多线程的,那么这些单元将由多个线程完成。

您可以通过向示例中添加几行代码来了解它是如何工作的

fn main() {
...
    let jh = tokio::task::spawn(server.run());
    println!("udp server started {:?}", std::thread::current().id());
    jh.await?;
...
}
fn run
... 
   loop {
        if let Some((size, peer)) = to_send {
            let amt = socket.send_to(&buf[..size], &peer).await?;
            println!("eched back {:?}", std::thread::current().id());
        }

        to_send = Some(socket.recv_from(&mut buf).await?);
        println!("read some stuff {:?}", std::thread::current().id());
    }