Rust Tokio mpsc::channel 多任务程序的意外行为
Rust Tokio mpsc::channel unexpected behavior for multi-task program
在下面的程序中我使用了Tokio 的mpsc 频道。发送者被移动到名为 input_message
的任务,接收者被移动到另一个名为 printer
的任务。这两个任务在 main 函数中都是 tokio::spawn()
-ed。 input_message
任务是读取用户的输入并通过 Channel 发送。频道上的 printer
任务 recv()
获取用户的输入并简单地将其打印到标准输出:
use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let printer = tokio::spawn(async move {
loop {
let res = rx.recv().await; // (11) Comment this ..
// let res = rx.try_recv(); // (12) Uncomment this ,,
if let Some(m) = res { // .. and this
// if let Ok(m) = res { // ,, and this
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
}
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = std::io::stdin();
let mut bufr = std::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
std::thread::sleep(std::time::Duration::from_millis(1));
print!("Enter input: ");
std::io::stdout().flush().unwrap();
bufr.read_line(&mut buf).unwrap();
if buf.trim() == "q".to_string() {
tx.send(buf).unwrap();
break;
}
tx.send(buf).unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
程序的预期行为是:
- 要求用户随机输入(q 退出)
- 将相同的输入打印到标准输出
在第 11-13 行中使用 rx.recv().await
,程序似乎缓冲了表示用户输入的字符串:printer
任务未接收到各种输入,因此不会打印字符串到标准输出。一旦退出消息(即 q)被发送,input_message
任务退出并且消息似乎被冲出通道并且接收者立即处理它们,因此 printer
任务打印所有一次输入。这是错误输出的示例:
Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited
我的问题是,通道怎么可能只在发送线程退出时缓冲消息并一次性处理它们,而不是在发送时接收它们?
我尝试做的是使用第 12-14 行中的 try_recv()
函数,它确实解决了问题。输出正确打印,这里是一个例子:
Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited
鉴于此,我感到困惑。我知道 recv().await
和 try_recv()
函数之间的区别,但我认为在这种情况下还有更多我忽略的东西使后者起作用而前者不起作用。有没有人能够阐明并详细说明这一点?为什么 try_recv()
有效而 recv().await
无效,为什么 recv().await
在这种情况下无效?就效率而言,循环是 try_recv()
不好还是“不好的做法”?
这里有几点需要指出,但首先,您正在等待 std::io::stdin()
上的行,这会阻塞线程,直到行到达该流。当线程等待输入时,不能在此线程上执行其他 future,this blog post 如果您想更深入地了解为什么不应该这样做,这是一个很好的资源。
Tokio 的 io
模块为 stdin()
提供了一个异步句柄,您可以使用它作为快速修复,尽管 the documentation explicitly mentions 您应该启动一个专用的 (non-async) 用于交互式用户输入的线程,而不是使用异步句柄。
将 std::io::stdin()
替换为 tokio::io::stdin()
还需要将标准库 BufReader
替换为包装 R: AsyncRead
而不是 R: Read
的 tokio 实现。
为了防止输入任务和输出任务之间的交错写入,您可以使用响应通道,在打印输出后向输入任务发送信号。您可以发送包含以下字段的 Message
,而不是通过频道发送 String
:
struct Message {
payload: String,
done_tx: oneshot::Sender<()>,
}
读取输入行后,通过通道将 Message
发送到打印机任务。打印机任务打印 String
并通过 done_tx
发出信号,表明输入任务可以打印输入提示并等待新行。
将所有这些与其他一些更改(例如等待消息的 while 循环)放在一起,您最终会得到如下结果:
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
done_tx: oneshot::Sender<()>,
message: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let printer = tokio::spawn(async move {
while let Some(Message {
message: m,
done_tx,
}) = rx.recv().await
{
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
done_tx.send(()).unwrap();
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut bufr = tokio::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
stdout.write(b"Enter input: ").await.unwrap();
stdout.flush().await.unwrap();
bufr.read_line(&mut buf).await.unwrap();
let end = buf.trim() == "q";
let (done_tx, done) = oneshot::channel();
let message = Message {
message: buf,
done_tx,
};
tx.send(message).unwrap();
if end {
break;
}
done.await.unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
在下面的程序中我使用了Tokio 的mpsc 频道。发送者被移动到名为 input_message
的任务,接收者被移动到另一个名为 printer
的任务。这两个任务在 main 函数中都是 tokio::spawn()
-ed。 input_message
任务是读取用户的输入并通过 Channel 发送。频道上的 printer
任务 recv()
获取用户的输入并简单地将其打印到标准输出:
use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let printer = tokio::spawn(async move {
loop {
let res = rx.recv().await; // (11) Comment this ..
// let res = rx.try_recv(); // (12) Uncomment this ,,
if let Some(m) = res { // .. and this
// if let Ok(m) = res { // ,, and this
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
}
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = std::io::stdin();
let mut bufr = std::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
std::thread::sleep(std::time::Duration::from_millis(1));
print!("Enter input: ");
std::io::stdout().flush().unwrap();
bufr.read_line(&mut buf).unwrap();
if buf.trim() == "q".to_string() {
tx.send(buf).unwrap();
break;
}
tx.send(buf).unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
程序的预期行为是:
- 要求用户随机输入(q 退出)
- 将相同的输入打印到标准输出
在第 11-13 行中使用 rx.recv().await
,程序似乎缓冲了表示用户输入的字符串:printer
任务未接收到各种输入,因此不会打印字符串到标准输出。一旦退出消息(即 q)被发送,input_message
任务退出并且消息似乎被冲出通道并且接收者立即处理它们,因此 printer
任务打印所有一次输入。这是错误输出的示例:
Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited
我的问题是,通道怎么可能只在发送线程退出时缓冲消息并一次性处理它们,而不是在发送时接收它们?
我尝试做的是使用第 12-14 行中的 try_recv()
函数,它确实解决了问题。输出正确打印,这里是一个例子:
Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited
鉴于此,我感到困惑。我知道 recv().await
和 try_recv()
函数之间的区别,但我认为在这种情况下还有更多我忽略的东西使后者起作用而前者不起作用。有没有人能够阐明并详细说明这一点?为什么 try_recv()
有效而 recv().await
无效,为什么 recv().await
在这种情况下无效?就效率而言,循环是 try_recv()
不好还是“不好的做法”?
这里有几点需要指出,但首先,您正在等待 std::io::stdin()
上的行,这会阻塞线程,直到行到达该流。当线程等待输入时,不能在此线程上执行其他 future,this blog post 如果您想更深入地了解为什么不应该这样做,这是一个很好的资源。
Tokio 的 io
模块为 stdin()
提供了一个异步句柄,您可以使用它作为快速修复,尽管 the documentation explicitly mentions 您应该启动一个专用的 (non-async) 用于交互式用户输入的线程,而不是使用异步句柄。
将 std::io::stdin()
替换为 tokio::io::stdin()
还需要将标准库 BufReader
替换为包装 R: AsyncRead
而不是 R: Read
的 tokio 实现。
为了防止输入任务和输出任务之间的交错写入,您可以使用响应通道,在打印输出后向输入任务发送信号。您可以发送包含以下字段的 Message
,而不是通过频道发送 String
:
struct Message {
payload: String,
done_tx: oneshot::Sender<()>,
}
读取输入行后,通过通道将 Message
发送到打印机任务。打印机任务打印 String
并通过 done_tx
发出信号,表明输入任务可以打印输入提示并等待新行。
将所有这些与其他一些更改(例如等待消息的 while 循环)放在一起,您最终会得到如下结果:
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
done_tx: oneshot::Sender<()>,
message: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let printer = tokio::spawn(async move {
while let Some(Message {
message: m,
done_tx,
}) = rx.recv().await
{
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
done_tx.send(()).unwrap();
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut bufr = tokio::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
stdout.write(b"Enter input: ").await.unwrap();
stdout.flush().await.unwrap();
bufr.read_line(&mut buf).await.unwrap();
let end = buf.trim() == "q";
let (done_tx, done) = oneshot::channel();
let message = Message {
message: buf,
done_tx,
};
tx.send(message).unwrap();
if end {
break;
}
done.await.unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}