从频道读取并使用 poll_fn 和 try_ready 的 Tokio 未来永远不会完成
Tokio future that reads from a channel and uses poll_fn and try_ready never completes
我有一个永远不会完成的 Tokio 未来(rx
是一个 Receiver
而 sock
是一个 tokio UdpSocket
)。它基本上从数据包队列中读取数据包并通过套接字传输它们:
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
它执行到 poll_send_to
行(poll_send_to
之前的 println!
执行,之后的 println!
不执行)然后永远等待而不发送数据包。
我将上面的 future 替换为以下 future 以确保它不是套接字问题(我认为之前有一些问题是不稳定的通知):
poll_fn(move || {
let packet = vec![0;10];
let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
这个 future 工作得很好——它按预期发送了数据包并退出了程序。
鉴于 rx
可以 poll
成功并打印 println
消息,我认为问题不在于消息通道。鉴于第二个未来可行,我认为问题不在于套接字。我是直接通过 Wireshark 观察数据包,所以我认为这也不是我观察的问题。
我对 Rust 和 Tokio 很陌生,所以我可能忽略了一些基本事实(例如,不能在同一个 future 中两次 try_ready
,future 不会从它离开的地方恢复之前关闭等)。
你能帮我找出第一个未来的问题吗?
use futures::future::lazy;
use futures::stream::Stream;
use futures::try_ready;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::future::poll_fn;
use tokio::prelude::Future;
fn main() {
let mut sock = UdpSocket::bind(&SocketAddr::from_str("127.0.0.1:8000").expect("Parse error"))
.expect("Bind error");
let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, SocketAddr)>(2000);
tokio::run(lazy(move || {
//----------------- This future works ----------------//
// tokio::spawn(
// poll_fn(move || {
// let packet = vec![70; 10];
// let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
// try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// Ok(futures::Async::Ready(()))
// })
// .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
// );
//----------------- This future doesn't ----------------//
tokio::spawn(
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
// This is printed
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// This is never printed
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
//----------------- This future queues a packet ----------------//
tokio::spawn(
poll_fn(move || {
try_ready!(tx.poll_ready());
tx.try_send((
vec![70; 10],
SocketAddr::from_str("127.0.0.1:8001").expect("Parse error"),
))
.expect("Send error");
// Wait permanently so message channel doesn't get disconnected
// Achieved differently in production
Ok(futures::Async::NotReady)
})
.map_err(|e: tokio::sync::mpsc::error::SendError| println!("Error: {:?}", e)),
);
Ok(())
}));
}
使用这个版本你以后显示问题:
tokio::spawn(
future::poll_fn(move || {
eprintln!("Starting poll_fn");
let from_channel = rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error"));
if let Some((packet, to)) = futures::try_ready!(dbg!(from_channel)) {
futures::try_ready!(dbg!(sock.poll_send_to(packet.as_slice(), &to)));
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
这是稍微清理过的输出:
Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)
Starting poll_fn
[src/main.rs:21] from_channel = Ok(Ready(Some(/* ... */)))
[src/main.rs:22] sock.poll_send_to(packet.as_slice(), &to) = Ok(NotReady)
Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)
换言之:
- 未来开始。
- 频道没有准备好;频道注册通知。
- 未来returns.
- 通道获取值并通知任务。
- 未来重新开始。
- 通道中有一个值。
- 在套接字上发送还没有准备好;套接字注册通知。
- 未来returns.
- 套接字被清除并通知任务。
- 未来重新开始。
- 频道没有准备好;频道注册通知。
- 未来returns.
- 频道中从未添加任何其他内容。
简而言之,您没有正确维护未来的状态机。你需要知道你上次在未来 运行 时走了多远,并在下次 运行.
时从那个点开始
async
/ await
语法备受期待是有原因的:它会为您编写这些状态机。
我不知道为什么您选择使用基于poll
的较低级别的界面。我会使用更高级别的基于 Future
的:
tokio::spawn({
rx.fold(sock, |sock, (packet, to)| {
sock.send_dgram(packet, &to)
.inspect(|_| println!("Sent it!"))
.map(|(sock, _)| sock)
.map_err(|e| panic!("Error: {:?}", e))
})
.map(drop)
.map_err(|e| panic!("Error: {:?}", e))
});
the Future
-based interface [...] destroys the socket(and buffer) on error
这是使用基于 poll
的界面的一个很好的理由,但我仍然会深入研究它足够长的时间来实现你自己的未来。像这样:
struct X(UdpSocket);
struct XSendGram<D> {
sock: Option<UdpSocket>,
data: D,
addr: SocketAddr,
}
impl X {
fn send_dgram<D>(self, data: D, addr: SocketAddr) -> XSendGram<D> {
XSendGram {
sock: Some(self.0),
data,
addr,
}
}
}
impl<D> Future for XSendGram<D>
where
D: AsRef<[u8]>,
{
type Item = (X, usize);
type Error = (X, std::io::Error);
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
let mut sock = self.sock.take().expect("Future called after success or failure");
match sock.poll_send_to(self.data.as_ref(), &self.addr) {
Ok(Async::Ready(bytes)) => Ok(Async::Ready((X(sock), bytes))),
Ok(Async::NotReady) => {
self.sock = Some(sock); // Restore it for the next call
Ok(Async::NotReady)
}
Err(e) => Err((X(sock), e)),
}
}
}
tokio::spawn({
rx.fold(X(sock), |sock, (packet, to)| {
sock.send_dgram(packet, to)
.inspect(|(_, n)| println!("Sent {} bytes", n))
.then(|r| match r {
Ok((sock, _)) | Err((sock, _)) => future::ok(sock),
})
})
.map(drop)
.map_err(|e| panic!("Error: {:?}", e))
});
我有一个永远不会完成的 Tokio 未来(rx
是一个 Receiver
而 sock
是一个 tokio UdpSocket
)。它基本上从数据包队列中读取数据包并通过套接字传输它们:
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
它执行到 poll_send_to
行(poll_send_to
之前的 println!
执行,之后的 println!
不执行)然后永远等待而不发送数据包。
我将上面的 future 替换为以下 future 以确保它不是套接字问题(我认为之前有一些问题是不稳定的通知):
poll_fn(move || {
let packet = vec![0;10];
let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
这个 future 工作得很好——它按预期发送了数据包并退出了程序。
鉴于 rx
可以 poll
成功并打印 println
消息,我认为问题不在于消息通道。鉴于第二个未来可行,我认为问题不在于套接字。我是直接通过 Wireshark 观察数据包,所以我认为这也不是我观察的问题。
我对 Rust 和 Tokio 很陌生,所以我可能忽略了一些基本事实(例如,不能在同一个 future 中两次 try_ready
,future 不会从它离开的地方恢复之前关闭等)。
你能帮我找出第一个未来的问题吗?
use futures::future::lazy;
use futures::stream::Stream;
use futures::try_ready;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::future::poll_fn;
use tokio::prelude::Future;
fn main() {
let mut sock = UdpSocket::bind(&SocketAddr::from_str("127.0.0.1:8000").expect("Parse error"))
.expect("Bind error");
let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, SocketAddr)>(2000);
tokio::run(lazy(move || {
//----------------- This future works ----------------//
// tokio::spawn(
// poll_fn(move || {
// let packet = vec![70; 10];
// let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
// try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// Ok(futures::Async::Ready(()))
// })
// .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
// );
//----------------- This future doesn't ----------------//
tokio::spawn(
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
// This is printed
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// This is never printed
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
//----------------- This future queues a packet ----------------//
tokio::spawn(
poll_fn(move || {
try_ready!(tx.poll_ready());
tx.try_send((
vec![70; 10],
SocketAddr::from_str("127.0.0.1:8001").expect("Parse error"),
))
.expect("Send error");
// Wait permanently so message channel doesn't get disconnected
// Achieved differently in production
Ok(futures::Async::NotReady)
})
.map_err(|e: tokio::sync::mpsc::error::SendError| println!("Error: {:?}", e)),
);
Ok(())
}));
}
使用这个版本你以后显示问题:
tokio::spawn(
future::poll_fn(move || {
eprintln!("Starting poll_fn");
let from_channel = rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error"));
if let Some((packet, to)) = futures::try_ready!(dbg!(from_channel)) {
futures::try_ready!(dbg!(sock.poll_send_to(packet.as_slice(), &to)));
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
这是稍微清理过的输出:
Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)
Starting poll_fn
[src/main.rs:21] from_channel = Ok(Ready(Some(/* ... */)))
[src/main.rs:22] sock.poll_send_to(packet.as_slice(), &to) = Ok(NotReady)
Starting poll_fn
[src/main.rs:21] from_channel = Ok(NotReady)
换言之:
- 未来开始。
- 频道没有准备好;频道注册通知。
- 未来returns.
- 通道获取值并通知任务。
- 未来重新开始。
- 通道中有一个值。
- 在套接字上发送还没有准备好;套接字注册通知。
- 未来returns.
- 套接字被清除并通知任务。
- 未来重新开始。
- 频道没有准备好;频道注册通知。
- 未来returns.
- 频道中从未添加任何其他内容。
简而言之,您没有正确维护未来的状态机。你需要知道你上次在未来 运行 时走了多远,并在下次 运行.
时从那个点开始async
/ await
语法备受期待是有原因的:它会为您编写这些状态机。
我不知道为什么您选择使用基于poll
的较低级别的界面。我会使用更高级别的基于 Future
的:
tokio::spawn({
rx.fold(sock, |sock, (packet, to)| {
sock.send_dgram(packet, &to)
.inspect(|_| println!("Sent it!"))
.map(|(sock, _)| sock)
.map_err(|e| panic!("Error: {:?}", e))
})
.map(drop)
.map_err(|e| panic!("Error: {:?}", e))
});
the
Future
-based interface [...] destroys the socket(and buffer) on error
这是使用基于 poll
的界面的一个很好的理由,但我仍然会深入研究它足够长的时间来实现你自己的未来。像这样:
struct X(UdpSocket);
struct XSendGram<D> {
sock: Option<UdpSocket>,
data: D,
addr: SocketAddr,
}
impl X {
fn send_dgram<D>(self, data: D, addr: SocketAddr) -> XSendGram<D> {
XSendGram {
sock: Some(self.0),
data,
addr,
}
}
}
impl<D> Future for XSendGram<D>
where
D: AsRef<[u8]>,
{
type Item = (X, usize);
type Error = (X, std::io::Error);
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
let mut sock = self.sock.take().expect("Future called after success or failure");
match sock.poll_send_to(self.data.as_ref(), &self.addr) {
Ok(Async::Ready(bytes)) => Ok(Async::Ready((X(sock), bytes))),
Ok(Async::NotReady) => {
self.sock = Some(sock); // Restore it for the next call
Ok(Async::NotReady)
}
Err(e) => Err((X(sock), e)),
}
}
}
tokio::spawn({
rx.fold(X(sock), |sock, (packet, to)| {
sock.send_dgram(packet, to)
.inspect(|(_, n)| println!("Sent {} bytes", n))
.then(|r| match r {
Ok((sock, _)) | Err((sock, _)) => future::ok(sock),
})
})
.map(drop)
.map_err(|e| panic!("Error: {:?}", e))
});