从 UdpSocket 异步读取
Async read from UdpSocket
我正在尝试在 Tokio 中同时处理到达的 UDP 数据包。然而,以下 MWE 没有达到我的预期:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream};
use std::net::SocketAddr;
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;
// just a codec to send and receive bytes
pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
into.extend(buf);
addr
}
}
fn compute(addr: SocketAddr, msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
println!("Starting to compute for: {}", addr);
// sleep is a placeholder for a long computation
std::thread::sleep(std::time::Duration::from_secs(8));
println!("Done computing for for: {}", addr);
Box::new(futures::future::ok(()))
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let listening_addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&listening_addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let (writer, reader) = socket.framed(LineCodec).split();
let socket_read = reader.for_each(|(addr, msg)| {
println!("Got {:?}", msg);
handle.spawn(compute(addr, msg));
Ok(())
});
core.run(socket_read).unwrap();
}
用 $ nc -u localhost 8080
连接两个终端并发送一些文本,我可以看到来自第二个终端的消息是在第一个终端完成后处理的。
我需要更改什么?
从不 sleep
在异步代码中(也避免任何其他阻塞调用)。
您可能想像这样使用 Timeout
:
fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
println!("Starting to compute for: {}", addr);
Box::new(
Timeout::new(std::time::Duration::from_secs(8), handle)
.unwrap()
.map_err(|e| panic!("timeout failed: {:?}", e))
.and_then(move |()| {
println!("Done computing for for: {}", addr);
Ok(())
}),
)
}
正如@Stefan 在另一个回答中所说,您不应该阻塞异步代码。鉴于您的示例, sleep
看起来像是一些长时间计算的占位符。因此,您应该将该计算委托给另一个线程,而不是使用超时,例如 this example:
extern crate futures;
extern crate futures_cpupool;
use futures::Future;
use futures_cpupool::CpuPool;
...
let pool = CpuPool::new_num_cpus();
...
fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
// I don't know enough about Tokio to know how to make `pool` available here
pool.spawn_fn (|| {
println!("Starting to compute for: {}", addr);
std::thread::sleep(std::time::Duration::from_secs(8));
println!("Done computing for for: {}", addr);
Ok(())
})
}
我正在尝试在 Tokio 中同时处理到达的 UDP 数据包。然而,以下 MWE 没有达到我的预期:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream};
use std::net::SocketAddr;
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;
// just a codec to send and receive bytes
pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
into.extend(buf);
addr
}
}
fn compute(addr: SocketAddr, msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
println!("Starting to compute for: {}", addr);
// sleep is a placeholder for a long computation
std::thread::sleep(std::time::Duration::from_secs(8));
println!("Done computing for for: {}", addr);
Box::new(futures::future::ok(()))
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let listening_addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&listening_addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let (writer, reader) = socket.framed(LineCodec).split();
let socket_read = reader.for_each(|(addr, msg)| {
println!("Got {:?}", msg);
handle.spawn(compute(addr, msg));
Ok(())
});
core.run(socket_read).unwrap();
}
用 $ nc -u localhost 8080
连接两个终端并发送一些文本,我可以看到来自第二个终端的消息是在第一个终端完成后处理的。
我需要更改什么?
从不 sleep
在异步代码中(也避免任何其他阻塞调用)。
您可能想像这样使用 Timeout
:
fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
println!("Starting to compute for: {}", addr);
Box::new(
Timeout::new(std::time::Duration::from_secs(8), handle)
.unwrap()
.map_err(|e| panic!("timeout failed: {:?}", e))
.and_then(move |()| {
println!("Done computing for for: {}", addr);
Ok(())
}),
)
}
正如@Stefan 在另一个回答中所说,您不应该阻塞异步代码。鉴于您的示例, sleep
看起来像是一些长时间计算的占位符。因此,您应该将该计算委托给另一个线程,而不是使用超时,例如 this example:
extern crate futures;
extern crate futures_cpupool;
use futures::Future;
use futures_cpupool::CpuPool;
...
let pool = CpuPool::new_num_cpus();
...
fn compute(handle: &Handle, addr: SocketAddr, _msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
// I don't know enough about Tokio to know how to make `pool` available here
pool.spawn_fn (|| {
println!("Starting to compute for: {}", addr);
std::thread::sleep(std::time::Duration::from_secs(8));
println!("Done computing for for: {}", addr);
Ok(())
})
}