当多个期货使用相同的底层套接字时,为什么我没有得到它们的唤醒?
Why do I not get a wakeup for multiple futures when they use the same underlying socket?
我有使用同一个本地 UdpSocket 将数据发送到多个 UDP 端点的代码:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::{
future::Future,
net::{Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
use tokio::net::UdpSocket;
#[tokio::main]
async fn main() {
let server_0: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12000).into();
let server_2: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12002).into();
let server_1: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12001).into();
tokio::spawn(start_server(server_0));
tokio::spawn(start_server(server_1));
tokio::spawn(start_server(server_2));
let client_addr: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12004).into();
let socket = UdpSocket::bind(client_addr).await.unwrap();
let mut futs = FuturesUnordered::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));
while let Some(n) = futs.next().await {
println!("Done: {:?}", n)
}
}
async fn start_server(addr: SocketAddr) {
let mut socket = UdpSocket::bind(addr).await.unwrap();
let mut buf = [0; 512];
loop {
println!("{:?}", socket.recv_from(&mut buf).await);
}
}
struct Task<'a> {
value: u32,
socket: &'a UdpSocket,
addr: &'a SocketAddr,
}
impl<'a> Task<'a> {
fn new(value: u32, socket: &'a UdpSocket, addr: &'a SocketAddr) -> Self {
Self {
value,
socket,
addr,
}
}
}
impl Future for Task<'_> {
type Output = Option<u32>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Polling for {}", self.value);
let buf = &self.value.to_be_bytes();
match self.socket.poll_send_to(cx, buf, self.addr) {
Poll::Ready(Ok(_)) => {
println!("Got Ok for {}", self.value);
Poll::Ready(Some(self.value))
}
Poll::Ready(Err(_)) => {
println!("Got err for {}", self.value);
Poll::Ready(None)
}
Poll::Pending => {
println!("Got pending for {}", self.value);
Poll::Pending
}
}
}
}
有时只写其中一条数据就卡住了,打印:
Polling for 0
Got pending for 0
Polling for 1
Got pending for 1
Polling for 2
Got pending for 2
Polling for 2
Got Ok for 2
Done: Some(2)
Ok((4, V4(127.0.0.1:12004)))
在这种情况下,值为 0 和 1 的任务永远不会被唤醒。我如何可靠地向他们发出信号以唤醒他们?
我尝试在收到 Poll::Ready
时调用 cx.waker().wake_by_ref()
,因为我认为这也可能会唤醒其他人,但事实并非如此。
当 poll_send_to
returns Poll::Pending
时,它保证向上下文中提供的 Waker
发出唤醒被轮询。但是,它只需要在最后一次 Waker
被轮询时发出唤醒。这意味着,由于您在多个任务的同一个套接字上调用 poll_send_to
,套接字只承诺向最后轮询它的套接字发出唤醒。
这也解释了为什么这样做:
let mut futs = Vec::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));
for n in join_all(futs).await {
println!("Done: {:?}", n)
}
与 FuturesUnordered
不同,join_all
组合器每次轮询时都会轮询每个内部未来,但 FuturesUnordered
会跟踪唤醒来自哪个潜在未来。
另见 this thread。
我有使用同一个本地 UdpSocket 将数据发送到多个 UDP 端点的代码:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::{
future::Future,
net::{Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
use tokio::net::UdpSocket;
#[tokio::main]
async fn main() {
let server_0: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12000).into();
let server_2: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12002).into();
let server_1: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12001).into();
tokio::spawn(start_server(server_0));
tokio::spawn(start_server(server_1));
tokio::spawn(start_server(server_2));
let client_addr: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12004).into();
let socket = UdpSocket::bind(client_addr).await.unwrap();
let mut futs = FuturesUnordered::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));
while let Some(n) = futs.next().await {
println!("Done: {:?}", n)
}
}
async fn start_server(addr: SocketAddr) {
let mut socket = UdpSocket::bind(addr).await.unwrap();
let mut buf = [0; 512];
loop {
println!("{:?}", socket.recv_from(&mut buf).await);
}
}
struct Task<'a> {
value: u32,
socket: &'a UdpSocket,
addr: &'a SocketAddr,
}
impl<'a> Task<'a> {
fn new(value: u32, socket: &'a UdpSocket, addr: &'a SocketAddr) -> Self {
Self {
value,
socket,
addr,
}
}
}
impl Future for Task<'_> {
type Output = Option<u32>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Polling for {}", self.value);
let buf = &self.value.to_be_bytes();
match self.socket.poll_send_to(cx, buf, self.addr) {
Poll::Ready(Ok(_)) => {
println!("Got Ok for {}", self.value);
Poll::Ready(Some(self.value))
}
Poll::Ready(Err(_)) => {
println!("Got err for {}", self.value);
Poll::Ready(None)
}
Poll::Pending => {
println!("Got pending for {}", self.value);
Poll::Pending
}
}
}
}
有时只写其中一条数据就卡住了,打印:
Polling for 0
Got pending for 0
Polling for 1
Got pending for 1
Polling for 2
Got pending for 2
Polling for 2
Got Ok for 2
Done: Some(2)
Ok((4, V4(127.0.0.1:12004)))
在这种情况下,值为 0 和 1 的任务永远不会被唤醒。我如何可靠地向他们发出信号以唤醒他们?
我尝试在收到 Poll::Ready
时调用 cx.waker().wake_by_ref()
,因为我认为这也可能会唤醒其他人,但事实并非如此。
当 poll_send_to
returns Poll::Pending
时,它保证向上下文中提供的 Waker
发出唤醒被轮询。但是,它只需要在最后一次 Waker
被轮询时发出唤醒。这意味着,由于您在多个任务的同一个套接字上调用 poll_send_to
,套接字只承诺向最后轮询它的套接字发出唤醒。
这也解释了为什么这样做:
let mut futs = Vec::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));
for n in join_all(futs).await {
println!("Done: {:?}", n)
}
与 FuturesUnordered
不同,join_all
组合器每次轮询时都会轮询每个内部未来,但 FuturesUnordered
会跟踪唤醒来自哪个潜在未来。
另见 this thread。