如何查找 tokio::sync::mpsc::Receiver 是否已关闭?
How to find if tokio::sync::mpsc::Receiver has been closed?
我有一个循环,我在其中做一些工作并使用 Sender
发送结果。这项工作需要时间,我需要在失败的情况下重试。有可能在我重试时,接收器已关闭,我的重试将是浪费时间。因此,我需要一种无需发送消息即可检查 Receiver
是否可用的方法。
在理想情况下,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
问题是 Sender
没有 is_closed
方法。它确实有 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>
,但我不知道 Context
是什么,也不知道在哪里可以找到它。
当我没有要发送的值时,如何检查发送方是否能够发送?
Sender
有一个 try_send 方法:
Attempts to immediately send a message on this Sender
This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).
使用它代替 send
并检查错误:
if let Err(TrySendError::Closed(_)) = tx.send(result).await {
break;
}
可以使用 futures
crate 中的 poll_fn
来做你想做的事。它使函数 returning Poll
适应 return a Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22
fn wait_until_ready<'a, T>(
sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
poll_fn(move |cx| sender.poll_ready(cx))
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = channel::<i32>(1);
tokio::spawn(async move {
// Receive one value and close the channel;
let val = rx.recv().await;
println!("{:?}", val);
});
wait_until_ready(&mut tx).await.unwrap();
tx.send(123).await.unwrap();
wait_until_ready(&mut tx).await.unwrap();
delay_for(std::time::Duration::from_secs(1)).await;
tx.send(456).await.unwrap(); // 456 likely never printed out,
// despite having a positive readiness response
// and the send "succeeding"
}
但是请注意,在一般情况下,这很容易受到 TOCTOU 的影响。即使 Sender
的 poll_ready
在通道中保留了一个插槽供以后使用,接收端也有可能在就绪检查和实际发送之间关闭。我试图在代码中指出这一点。
发送接收方忽略的空消息。它可以是任何东西。例如,如果您现在发送 T
,您可以将其更改为 Option<T>
并让接收方忽略 None
s。
Yeah, that will work, although I don't really liked this approach since I need to change communication format.
我不会挂断通信格式。这不是一个 well-defined 网络协议,应该从实现细节中分离出来;它是您自己的两段代码之间的内部通信机制。
我有一个循环,我在其中做一些工作并使用 Sender
发送结果。这项工作需要时间,我需要在失败的情况下重试。有可能在我重试时,接收器已关闭,我的重试将是浪费时间。因此,我需要一种无需发送消息即可检查 Receiver
是否可用的方法。
在理想情况下,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
问题是 Sender
没有 is_closed
方法。它确实有 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>
,但我不知道 Context
是什么,也不知道在哪里可以找到它。
当我没有要发送的值时,如何检查发送方是否能够发送?
Sender
有一个 try_send 方法:
Attempts to immediately send a message on this Sender
This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).
使用它代替 send
并检查错误:
if let Err(TrySendError::Closed(_)) = tx.send(result).await {
break;
}
可以使用 futures
crate 中的 poll_fn
来做你想做的事。它使函数 returning Poll
适应 return a Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22
fn wait_until_ready<'a, T>(
sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
poll_fn(move |cx| sender.poll_ready(cx))
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = channel::<i32>(1);
tokio::spawn(async move {
// Receive one value and close the channel;
let val = rx.recv().await;
println!("{:?}", val);
});
wait_until_ready(&mut tx).await.unwrap();
tx.send(123).await.unwrap();
wait_until_ready(&mut tx).await.unwrap();
delay_for(std::time::Duration::from_secs(1)).await;
tx.send(456).await.unwrap(); // 456 likely never printed out,
// despite having a positive readiness response
// and the send "succeeding"
}
但是请注意,在一般情况下,这很容易受到 TOCTOU 的影响。即使 Sender
的 poll_ready
在通道中保留了一个插槽供以后使用,接收端也有可能在就绪检查和实际发送之间关闭。我试图在代码中指出这一点。
发送接收方忽略的空消息。它可以是任何东西。例如,如果您现在发送 T
,您可以将其更改为 Option<T>
并让接收方忽略 None
s。
Yeah, that will work, although I don't really liked this approach since I need to change communication format.
我不会挂断通信格式。这不是一个 well-defined 网络协议,应该从实现细节中分离出来;它是您自己的两段代码之间的内部通信机制。