当使用 tokio::time::timeout 确定是否已经过了太多时间时程序挂起

Program hangs when using tokio::time::timeout to determine if too much time has passed

考虑以下代码,输入是一个简单的 csv 文件,如 https://github.com/gophercises/quiz/blob/master/problems.csv 中的文件:

#[derive(Debug)]
struct Problem {
    q: String,
    a: String,
}

impl Problem {
    pub fn new(q: &str, a: &str) -> Problem {
        Self {
            q: q.to_owned(),
            a: a.to_owned(),
        }
    }
}

const FILE_NAME: &str = "../../input/problems.csv";

#[tokio::main]
async fn main() {
    let probs = parse_lines().expect("Could not parse CSV file");

    println!("Banana quiz about to start. Press enter when ready.");
    let mut buf = String::new();
    match std::io::stdin().read_line(&mut buf).ok() {
        None => {
            println!("Error reading user input");
            std::process::exit(1)
        }
        _ => {}
    }

    let mut correct_ans = 0;

    for p in &probs {
        println!("What banana? {}", p.q);
        let (banana_s, mut banana_r) = mpsc::channel(512);

        tokio::spawn(async move {
            let mut banana = String::new();
            std::io::stdin().read_line(&mut banana);
            banana.pop();
            banana_s.send(banana).await.unwrap();
        });

        let mut banana = String::new();
        match tokio::time::timeout(Duration::from_secs(5), banana_r.recv()).await {
            Ok(opt) => {
                match opt {
                    Some(b) => banana.push_str(&b),
                    None => {},
                }
            },
            Err (_) => {
                println!("Only have 5 seconds to input the answer!");
                return;
            }
        };

        println!("Your Banana: {}, Correct Banana: {}\n", banana, p.a);

        if banana != p.a {
            println!("BAD BANANA!");
            break;
        }

        println!("good banana!");
        println!("------------");
        correct_ans += 1;
    }
    println!("Correct answers: {}/{}\n", correct_ans, probs.len())
}

fn parse_lines() -> Result<Vec<Problem>, csv::Error> {
    let mut builder = ReaderBuilder::new();
    builder.has_headers(false);

    let mut reader = builder.from_path(FILE_NAME)?;
    let mut probs = Vec::new();

    for r in reader.records() {
        let rec = r?;
        probs.push(Problem::new(&rec[0], &rec[1]));
    }

    Ok(probs)
}

除用户等待输入答案的时间过长(5 秒)外,一切正常。

打印消息 Only have 5 seconds to input the answer 后,程序不会 return 除非按回车键。一旦按下回车键,程序就会出现恐慌并显示以下消息:

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("")', src/main.rs:46:41

第 46 行是在通道上发送字符串的行:

banana_s.send(banana).await.unwrap();

我的理解是,发生这种情况是因为使用 tokio 生成的线程仍被 read_line() 函数阻塞。如果使用 std::threadstd::sync::mpsc::channel,则不会发生同样的事情,实际上程序会愉快地退出。

这是为什么?以及如何让程序在超时到期后关闭?

我查看了这个 post(非常相似),但调整它的解决方案对我不起作用。

你的理解似乎是正确的。此问题是由 async 上下文中的阻塞引起的。

假设问题没有及时回答,超时触发,用户收到一条消息,而你 return 来自 main()。那么程序应该退出了吧?

好吧,没那么快。由于您实际上是从由 tokio::main 创建的运行时执行的 async 块返回,该运行时需要关闭。标准 tokio 运行时不需要等待未完成的任务 完成 ,但确实需要它们 产生 执行。 tokio::spawn-d 块是这些任务之一,但不幸的是,std::io::stdin().read_line(&mut banana) 是一个 阻塞 调用,这意味着它在完成之前不会屈服(即当你再次按下回车键时)。

即使完成了,它也不会立即让步并执行到 banana_s.send(banana),returns 这是一个错误,因为接收器 banana_r 已经被销毁因此永远不会收到消息。请参阅 tokio::sync::mpsc 文档中的 Disconnection

这就是为什么你会得到你所看到的行为。

这种机制可以在非异步机制(std::threadstd::sync::mpsc)下正常工作,因为从 main() 返回 结束进程,这意味着其他线程会立即销毁而无需他们的参与。


要解决此问题,您应该使用 thread 而不是 task 来接受用户输入。您可能希望从每个问题具有不同的通道转变为具有一个行为类似于输入线流的通道。否则,您将尝试为每个线程生成新的阻塞线程,这可能会吞噬来自先前超时的新输入。

let (banana_s, mut banana_r) = mpsc::channel(512);
std::thread::spawn(move || loop {
    let mut banana = String::new();
    std::io::stdin().read_line(&mut banana).unwrap();
    banana.pop();
    banana_s.blocking_send(banana).unwrap(); // use blocking instead of async to send()
});

for p in &probs { // ...

除非在超时时退出程序是长期计划,否则之前是否发生超时都没有关系,因为无论如何它都会停止 运行。