如何使用 Tokio 远程关闭 运行 任务
How to remotely shut down running tasks with Tokio
我有一个正在接收数据的 UDP 套接字
pub async fn start() -> Result<(), std::io::Error> {
loop {
let mut data = vec![0; 1024];
socket.recv_from(&mut data).await?;
}
}
当没有数据传入时,此代码当前在 .await
上被阻止。我想从我的主线程正常关闭我的服务器,那么我如何向这个 [=12] 发送信号=] 它应该停止休眠并关闭?
注意: Tokio 网站在 graceful shutdown 上有一个页面。
如果您有多个任务要终止,您应该使用 broadcast channel to send shutdown messages. You can use it together with tokio::select!
。
use tokio::sync::broadcast::Receiver;
// You may want to log errors rather than return them in this function.
pub async fn start(kill: Receiver<()>) -> Result<(), std::io::Error> {
tokio::select! {
output = real_start() => output,
_ = kill.recv() => Err(...),
}
}
pub async fn real_start() -> Result<(), std::io::Error> {
loop {
let mut data = vec![0; 1024];
socket.recv_from(&mut data).await?;
}
}
然后杀死所有任务,在频道上发送消息。
只杀掉单个任务,可以使用JoinHandle::abort
方法,会尽快杀掉任务。请注意,此方法仅在 Tokio 1.x 和 0.3.x 中可用,要使用 Tokio 0.2.x 中止任务,请参阅下面的下一节。
let task = tokio::spawn(start());
...
task.abort();
作为 JoinHandle::abort
的替代方法,您可以使用 futures crate 中的 abortable
。生成任务时,执行以下操作:
let (task, handle) = abortable(start());
tokio::spawn(task);
然后你可以通过调用abort
方法终止任务。
handle.abort();
当然,带select!
的频道也可以用来kill单个任务,也许结合oneshot
频道而不是广播频道。
所有这些方法都保证 real_start
方法在 .await
处被杀死。当它是两个 .await
之间的 运行 代码时,不可能终止任务。你可以阅读更多关于为什么这是 here.
mini-redis project contains an accessible real-world example of graceful shutdown of a server. Additionally, the Tokio tutorial has chapters on both select and channels.
我有一个正在接收数据的 UDP 套接字
pub async fn start() -> Result<(), std::io::Error> {
loop {
let mut data = vec![0; 1024];
socket.recv_from(&mut data).await?;
}
}
当没有数据传入时,此代码当前在 .await
上被阻止。我想从我的主线程正常关闭我的服务器,那么我如何向这个 [=12] 发送信号=] 它应该停止休眠并关闭?
注意: Tokio 网站在 graceful shutdown 上有一个页面。
如果您有多个任务要终止,您应该使用 broadcast channel to send shutdown messages. You can use it together with tokio::select!
。
use tokio::sync::broadcast::Receiver;
// You may want to log errors rather than return them in this function.
pub async fn start(kill: Receiver<()>) -> Result<(), std::io::Error> {
tokio::select! {
output = real_start() => output,
_ = kill.recv() => Err(...),
}
}
pub async fn real_start() -> Result<(), std::io::Error> {
loop {
let mut data = vec![0; 1024];
socket.recv_from(&mut data).await?;
}
}
然后杀死所有任务,在频道上发送消息。
只杀掉单个任务,可以使用JoinHandle::abort
方法,会尽快杀掉任务。请注意,此方法仅在 Tokio 1.x 和 0.3.x 中可用,要使用 Tokio 0.2.x 中止任务,请参阅下面的下一节。
let task = tokio::spawn(start());
...
task.abort();
作为 JoinHandle::abort
的替代方法,您可以使用 futures crate 中的 abortable
。生成任务时,执行以下操作:
let (task, handle) = abortable(start());
tokio::spawn(task);
然后你可以通过调用abort
方法终止任务。
handle.abort();
当然,带select!
的频道也可以用来kill单个任务,也许结合oneshot
频道而不是广播频道。
所有这些方法都保证 real_start
方法在 .await
处被杀死。当它是两个 .await
之间的 运行 代码时,不可能终止任务。你可以阅读更多关于为什么这是 here.
mini-redis project contains an accessible real-world example of graceful shutdown of a server. Additionally, the Tokio tutorial has chapters on both select and channels.