处理用户输入
handling user input
如何让我的应用程序的某些部分读取用户输入并监听关闭。
根据 tokio-docs 要执行此类操作,我应该在生成的任务中使用阻塞 IO。
For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.
所以我做了这样的事情
async fn read_input(mut rx: watch::Receiver<&str>) {
let mut line = String::new();
let stdin = io::stdin();
loop {
stdin.lock().read_line(&mut line).expect("Could not read line");
let op = line.trim_right();
if op == "EXIT" {
break;
} else if op == "send" {
// send_stuff();
}
line.clear();
}
}
问题是,我如何检查接收器通道是否关闭并打破这个循环?
如果我等待代码将阻塞。
我是不是用错误的方法来处理这个问题 concept/architecture?
在不管理自己的线程的情况下,必须有一种方法可以在标准输入上使用一些非阻塞 OS API 并将其包装用于 tokio(tokio::io::Stdin 1.12 使用阻塞变体)。
否则,如果我们遵循文档的建议并生成我们自己的线程,
这是可以做到的:
fn start_reading_stdin_lines(
sender: tokio::sync::mpsc::Sender<String>,
runtime: tokio::runtime::Handle
) {
std::thread::spawn(move || {
let stdin = std::io::stdin();
let mut line_buf = String::new();
while let Ok(_) = stdin.read_line(&mut line_buf) {
let line = line_buf.trim_end().to_string();
line_buf.clear();
let sender2 = sender.clone();
runtime.spawn(async move {
let result = sender2.send(line).await;
if let Err(error) = result {
println!("start_reading_stdin_lines send error: {:?}", error);
}
});
}
});
}
fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
println!("exiting after a signal...");
let result = watch_sender.send(true);
if let Err(error) = result {
println!("watch_sender send error: {:?}", error);
}
});
}
async fn read_input(
mut line_receiver: tokio::sync::mpsc::Receiver<String>,
mut watch_receiver: tokio::sync::watch::Receiver<bool>
) {
loop {
tokio::select! {
Some(line) = line_receiver.recv() => {
println!("line: {}", line);
// process the input
match line.as_str() {
"exit" => {
println!("exiting manually...");
break;
},
"send" => {
println!("send_stuff");
}
unexpected_line => {
println!("unexpected command: {}", unexpected_line);
}
}
}
Ok(_) = watch_receiver.changed() => {
println!("shutdown");
break;
}
}
}
}
#[tokio::main]
async fn main() {
let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
// this will send a shutdown signal at some point
start_activity_until_shutdown(watch_sender);
read_input(line_receiver, watch_receiver).await;
}
潜在改进:
如何让我的应用程序的某些部分读取用户输入并监听关闭。
根据 tokio-docs 要执行此类操作,我应该在生成的任务中使用阻塞 IO。
For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.
所以我做了这样的事情
async fn read_input(mut rx: watch::Receiver<&str>) {
let mut line = String::new();
let stdin = io::stdin();
loop {
stdin.lock().read_line(&mut line).expect("Could not read line");
let op = line.trim_right();
if op == "EXIT" {
break;
} else if op == "send" {
// send_stuff();
}
line.clear();
}
}
问题是,我如何检查接收器通道是否关闭并打破这个循环? 如果我等待代码将阻塞。
我是不是用错误的方法来处理这个问题 concept/architecture?
在不管理自己的线程的情况下,必须有一种方法可以在标准输入上使用一些非阻塞 OS API 并将其包装用于 tokio(tokio::io::Stdin 1.12 使用阻塞变体)。
否则,如果我们遵循文档的建议并生成我们自己的线程, 这是可以做到的:
fn start_reading_stdin_lines(
sender: tokio::sync::mpsc::Sender<String>,
runtime: tokio::runtime::Handle
) {
std::thread::spawn(move || {
let stdin = std::io::stdin();
let mut line_buf = String::new();
while let Ok(_) = stdin.read_line(&mut line_buf) {
let line = line_buf.trim_end().to_string();
line_buf.clear();
let sender2 = sender.clone();
runtime.spawn(async move {
let result = sender2.send(line).await;
if let Err(error) = result {
println!("start_reading_stdin_lines send error: {:?}", error);
}
});
}
});
}
fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
println!("exiting after a signal...");
let result = watch_sender.send(true);
if let Err(error) = result {
println!("watch_sender send error: {:?}", error);
}
});
}
async fn read_input(
mut line_receiver: tokio::sync::mpsc::Receiver<String>,
mut watch_receiver: tokio::sync::watch::Receiver<bool>
) {
loop {
tokio::select! {
Some(line) = line_receiver.recv() => {
println!("line: {}", line);
// process the input
match line.as_str() {
"exit" => {
println!("exiting manually...");
break;
},
"send" => {
println!("send_stuff");
}
unexpected_line => {
println!("unexpected command: {}", unexpected_line);
}
}
}
Ok(_) = watch_receiver.changed() => {
println!("shutdown");
break;
}
}
}
}
#[tokio::main]
async fn main() {
let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
// this will send a shutdown signal at some point
start_activity_until_shutdown(watch_sender);
read_input(line_receiver, watch_receiver).await;
}
潜在改进: