从 UdpSocket 异步读取

Async read from UdpSocket

我正在尝试在 Tokio 中同时处理到达的 UDP 数据包。然而,以下 MWE 没有达到我的预期:

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use std::net::SocketAddr;
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;

// just a codec to send and receive bytes
pub struct LineCodec;
impl UdpCodec for LineCodec {
    type In = (SocketAddr, Vec<u8>);
    type Out = (SocketAddr, Vec<u8>);

    fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
        Ok((*addr, buf.to_vec()))
    }

    fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
        into.extend(buf);
        addr
    }
}

fn compute(addr: SocketAddr, msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    println!("Starting to compute for: {}", addr);
    // sleep is a placeholder for a long computation
    std::thread::sleep(std::time::Duration::from_secs(8));
    println!("Done computing for for: {}", addr);
    Box::new(futures::future::ok(()))
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let listening_addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
    let socket = UdpSocket::bind(&listening_addr, &handle).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    let (writer, reader) = socket.framed(LineCodec).split();

    let socket_read = reader.for_each(|(addr, msg)| {
        println!("Got {:?}", msg);
        handle.spawn(compute(addr, msg));
        Ok(())
    });

    core.run(socket_read).unwrap();
}

$ nc -u localhost 8080 连接两个终端并发送一些文本,我可以看到来自第二个终端的消息是在第一个终端完成后处理的。

我需要更改什么?

从不 sleep 在异步代码中(也避免任何其他阻塞调用)。

您可能想像这样使用 Timeout

Playground

fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    println!("Starting to compute for: {}", addr);
    Box::new(
        Timeout::new(std::time::Duration::from_secs(8), handle)
            .unwrap()
            .map_err(|e| panic!("timeout failed: {:?}", e))
            .and_then(move |()| {
                println!("Done computing for for: {}", addr);
                Ok(())
            }),
    )
}

正如@Stefan 在另一个回答中所说,您不应该阻塞异步代码。鉴于您的示例, sleep 看起来像是一些长时间计算的占位符。因此,您应该将该计算委托给另一个线程,而不是使用超时,例如 this example:

extern crate futures;
extern crate futures_cpupool;

use futures::Future;
use futures_cpupool::CpuPool;

...

let pool = CpuPool::new_num_cpus();

...

fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
    // I don't know enough about Tokio to know how to make `pool` available here
    pool.spawn_fn (|| {
        println!("Starting to compute for: {}", addr);
        std::thread::sleep(std::time::Duration::from_secs(8));
        println!("Done computing for for: {}", addr);
        Ok(())
    })
}