Tokio 任务能否优雅地终止整个运行时?
Can a Tokio task terminate the whole runtime gracefully?
我使用如下代码启动 Tokio 运行时:
tokio::run(my_future);
我的未来继续根据各种情况开始一堆任务。
其中一项任务负责确定程序何时关闭。但是,我不知道如何让该任务优雅地终止程序。理想情况下,我想为此任务找到一种方法来终止 run
函数调用。
下面是我想编写的程序类型的示例:
extern crate tokio;
use tokio::prelude::*;
use std::time::Duration;
use std::time::Instant;
use tokio::timer::{Delay, Interval};
fn main() {
let kill_future = Delay::new(Instant::now() + Duration::from_secs(3));
let time_print_future = Interval::new_interval(Duration::from_secs(1));
let mut runtime = tokio::runtime::Runtime::new().expect("failed to start new Runtime");
runtime.spawn(time_print_future.for_each(|t| Ok(println!("{:?}", t))).map_err(|_| ()));
runtime.spawn(
kill_future
.map_err(|_| {
eprintln!("Timer error");
})
.map(move |()| {
// TODO
unimplemented!("Shutdown the runtime!");
}),
);
// TODO
unimplemented!("Block until the runtime is shutdown");
println!("Done");
}
shutdown_now
看起来很有希望,但经过进一步调查,它可能行不通。特别是,它拥有运行时的所有权,而 Tokio 可能不允许主线程(创建运行时的地方)和一些随机任务拥有运行时。
您可以使用 oneshot channel 从运行时内部到外部进行通信。当延迟到期时,我们通过通道发送一条消息。
在运行时之外,一旦我们收到该消息,我们就会关闭运行时并wait
让它完成。
use std::time::{Duration, Instant};
use tokio::{
prelude::*,
runtime::Runtime,
sync::oneshot,
timer::{Delay, Interval},
}; // 0.1.15
fn main() {
let mut runtime = Runtime::new().expect("failed to start new Runtime");
let (tx, rx) = oneshot::channel();
runtime.spawn({
let every_second = Interval::new_interval(Duration::from_secs(1));
every_second
.for_each(|t| Ok(println!("{:?}", t)))
.map_err(drop)
});
runtime.spawn({
let in_three_seconds = Delay::new(Instant::now() + Duration::from_secs(3));
in_three_seconds
.map_err(|_| eprintln!("Timer error"))
.and_then(move |_| tx.send(()))
});
rx.wait().expect("unable to wait for receiver");
runtime
.shutdown_now()
.wait()
.expect("unable to wait for shutdown");
println!("Done");
}
另请参阅:
- Is there any way to shutdown `tokio::runtime::current_thread::Runtime`?
我使用如下代码启动 Tokio 运行时:
tokio::run(my_future);
我的未来继续根据各种情况开始一堆任务。
其中一项任务负责确定程序何时关闭。但是,我不知道如何让该任务优雅地终止程序。理想情况下,我想为此任务找到一种方法来终止 run
函数调用。
下面是我想编写的程序类型的示例:
extern crate tokio;
use tokio::prelude::*;
use std::time::Duration;
use std::time::Instant;
use tokio::timer::{Delay, Interval};
fn main() {
let kill_future = Delay::new(Instant::now() + Duration::from_secs(3));
let time_print_future = Interval::new_interval(Duration::from_secs(1));
let mut runtime = tokio::runtime::Runtime::new().expect("failed to start new Runtime");
runtime.spawn(time_print_future.for_each(|t| Ok(println!("{:?}", t))).map_err(|_| ()));
runtime.spawn(
kill_future
.map_err(|_| {
eprintln!("Timer error");
})
.map(move |()| {
// TODO
unimplemented!("Shutdown the runtime!");
}),
);
// TODO
unimplemented!("Block until the runtime is shutdown");
println!("Done");
}
shutdown_now
看起来很有希望,但经过进一步调查,它可能行不通。特别是,它拥有运行时的所有权,而 Tokio 可能不允许主线程(创建运行时的地方)和一些随机任务拥有运行时。
您可以使用 oneshot channel 从运行时内部到外部进行通信。当延迟到期时,我们通过通道发送一条消息。
在运行时之外,一旦我们收到该消息,我们就会关闭运行时并wait
让它完成。
use std::time::{Duration, Instant};
use tokio::{
prelude::*,
runtime::Runtime,
sync::oneshot,
timer::{Delay, Interval},
}; // 0.1.15
fn main() {
let mut runtime = Runtime::new().expect("failed to start new Runtime");
let (tx, rx) = oneshot::channel();
runtime.spawn({
let every_second = Interval::new_interval(Duration::from_secs(1));
every_second
.for_each(|t| Ok(println!("{:?}", t)))
.map_err(drop)
});
runtime.spawn({
let in_three_seconds = Delay::new(Instant::now() + Duration::from_secs(3));
in_three_seconds
.map_err(|_| eprintln!("Timer error"))
.and_then(move |_| tx.send(()))
});
rx.wait().expect("unable to wait for receiver");
runtime
.shutdown_now()
.wait()
.expect("unable to wait for shutdown");
println!("Done");
}
另请参阅:
- Is there any way to shutdown `tokio::runtime::current_thread::Runtime`?