处理用户输入

handling user input

如何让我的应用程序的某些部分读取用户输入并监听关闭。

根据 tokio-docs 要执行此类操作,我应该在生成的任务中使用阻塞 IO。

For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.

所以我做了这样的事情

async fn read_input(mut rx: watch::Receiver<&str>) {
    let mut line = String::new();
    let stdin = io::stdin();

    loop {
        stdin.lock().read_line(&mut line).expect("Could not read line");

        let op = line.trim_right();
        if op == "EXIT" {
            break;
        } else if op == "send" {
            // send_stuff();
        }
        line.clear();

    }
}

问题是,我如何检查接收器通道是否关闭并打破这个循环? 如果我等待代码将阻塞。

我是不是用错误的方法来处理这个问题 concept/architecture?

在不管理自己的线程的情况下,必须有一种方法可以在标准输入上使用一些非阻塞 OS API 并将其包装用于 tokio(tokio::io::Stdin 1.12 使用阻塞变体)。

否则,如果我们遵循文档的建议并生成我们自己的线程, 这是可以做到的:

fn start_reading_stdin_lines(
    sender: tokio::sync::mpsc::Sender<String>,
    runtime: tokio::runtime::Handle
) {
    std::thread::spawn(move || {
        let stdin = std::io::stdin();
        let mut line_buf = String::new();
        while let Ok(_) = stdin.read_line(&mut line_buf) {
            let line = line_buf.trim_end().to_string();
            line_buf.clear();
            let sender2 = sender.clone();

            runtime.spawn(async move {
                let result = sender2.send(line).await;
                if let Err(error) = result {
                    println!("start_reading_stdin_lines send error: {:?}", error);
                }
            });
        }
    });
}

fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        println!("exiting after a signal...");
        let result = watch_sender.send(true);
        if let Err(error) = result {
            println!("watch_sender send error: {:?}", error);
        }
    });
}

async fn read_input(
    mut line_receiver: tokio::sync::mpsc::Receiver<String>,
    mut watch_receiver: tokio::sync::watch::Receiver<bool>
) {
    loop {
        tokio::select! {
            Some(line) = line_receiver.recv() => {
                println!("line: {}", line);
                // process the input
                match line.as_str() {
                    "exit" => {
                        println!("exiting manually...");
                        break;
                    },
                    "send" => {
                        println!("send_stuff");
                    }
                    unexpected_line => {
                        println!("unexpected command: {}", unexpected_line);
                    }
                }
            }
            Ok(_) = watch_receiver.changed() => {
                println!("shutdown");
                break;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
    start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());

    let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
    // this will send a shutdown signal at some point
    start_activity_until_shutdown(watch_sender);

    read_input(line_receiver, watch_receiver).await;
}

潜在改进:

  • 如果您同意 tokio_stream wrappers, this could be combined more elegantly with start_reading_stdin_lines producing a stream of lines and mapping them to typed commands. read_input could be then based on StreamMap 而不是 select!
  • 启用实验性 stdin_forwarders 功能,使用 for 循环 lines()
  • 可以更轻松地阅读行