"blocking annotated I/O must be called from the context of the Tokio runtime" 在异步任务中读取标准输入时
"blocking annotated I/O must be called from the context of the Tokio runtime" when reading stdin in async task
我正在尝试从 tokio::spawn
生成的异步任务中的标准输入读取数据。这
执行者被装箱为
let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();
然后主要任务是 运行 和 executor.task(...)
,这会产生其他任务
tokio::spawn()
.
fn main
然后调用 executor.run().unwrap();
等待所有任务完成。
问题是我什么时候做
let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
...
stdin.read(&mut read_buf).await
我收到 "blocking annotated I/O must be called from the context of the Tokio runtime" 错误。
依赖关系:
futures-preview = { version = "0.3.0-alpha.18", features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"
完整代码:
extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;
use std::io::Write;
use std::net::SocketAddr;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpListener;
use tokio_sync::oneshot;
use futures::select;
use futures::future::FutureExt;
#[derive(Debug)]
enum AppErr {
CantBindAddr(std::io::Error),
CantAccept(std::io::Error),
}
fn main() {
let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();
executor.spawn(async {
match server_task().await {
Ok(()) => {}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
executor.run().unwrap(); // ignores RunError
}
async fn server_task() -> Result<(), AppErr> {
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let mut listener = TcpListener::bind(&addr).map_err(AppErr::CantBindAddr)?;
loop {
print!("Waiting for incoming connection...");
let _ = std::io::stdout().flush();
let (socket, _) = listener.accept().await.map_err(AppErr::CantAccept)?;
println!("{:?} connected.", socket);
let (read, write) = socket.split();
let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();
tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
tokio::spawn(handle_outgoing(
write,
abort_out_task_rcv,
abort_in_task_snd,
));
}
}
async fn handle_incoming(
mut conn: TcpStreamReadHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
println!("handle_incoming");
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
// TODO match abort_ret {..}
println!("abort signalled, handle_incoming returning");
return;
},
bytes = conn.read(&mut read_buf).fuse() => {
match bytes {
Err(io_err) => {
println!("io error when reading input stream: {:?}", io_err);
return;
}
Ok(bytes) => {
println!("read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
}
}
}
}
}
}
async fn handle_outgoing(
conn: TcpStreamWriteHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
println!("handle_outgoing");
let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
println!("abort signalled, handle_outgoing returning");
return;
}
input = stdin.read(&mut read_buf).fuse() => {
match input {
Err(io_err) => {
println!("io error when reading stdin: {:?}", io_err);
return;
}
Ok(bytes) => {
println!("handle_outgoing read {} bytes", bytes);
// TODO
}
}
},
}
}
}
问题:
- 我的任务生成对吗?我可以安全地在 main 中执行
tokio::spawn
吗?
任务传递给 executor.spawn()
?
- 我在这个程序中读取标准输入的方式有什么问题?
谢谢
Tokio stdin
阻塞了来自执行器池的封闭线程,因为它被来自 tokio-executor 的 blocking
注释。 From the reference :
When the blocking
function enters, it hands off the responsibility
of processing the current work queue to another thread.
您的代码无法正常工作,因为您使用的执行程序在单个线程中多路复用任务 (tokio::runtime::current_thread::Runtime::new()
)。因此将没有其他线程为执行者执行其他任务。
如果您正确配置运行时(具有多个线程的线程池),您的代码将正常工作:
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut executor = rt.executor();
executor.spawn(async {
match server_task().await {
Ok(()) => {}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
rt.shutdown_on_idle();
}
另请参阅: How can I stop reading from a tokio::io::lines stream?
我正在尝试从 tokio::spawn
生成的异步任务中的标准输入读取数据。这
执行者被装箱为
let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();
然后主要任务是 运行 和 executor.task(...)
,这会产生其他任务
tokio::spawn()
.
fn main
然后调用 executor.run().unwrap();
等待所有任务完成。
问题是我什么时候做
let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
...
stdin.read(&mut read_buf).await
我收到 "blocking annotated I/O must be called from the context of the Tokio runtime" 错误。
依赖关系:
futures-preview = { version = "0.3.0-alpha.18", features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"
完整代码:
extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;
use std::io::Write;
use std::net::SocketAddr;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpListener;
use tokio_sync::oneshot;
use futures::select;
use futures::future::FutureExt;
#[derive(Debug)]
enum AppErr {
CantBindAddr(std::io::Error),
CantAccept(std::io::Error),
}
fn main() {
let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();
executor.spawn(async {
match server_task().await {
Ok(()) => {}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
executor.run().unwrap(); // ignores RunError
}
async fn server_task() -> Result<(), AppErr> {
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let mut listener = TcpListener::bind(&addr).map_err(AppErr::CantBindAddr)?;
loop {
print!("Waiting for incoming connection...");
let _ = std::io::stdout().flush();
let (socket, _) = listener.accept().await.map_err(AppErr::CantAccept)?;
println!("{:?} connected.", socket);
let (read, write) = socket.split();
let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();
tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
tokio::spawn(handle_outgoing(
write,
abort_out_task_rcv,
abort_in_task_snd,
));
}
}
async fn handle_incoming(
mut conn: TcpStreamReadHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
println!("handle_incoming");
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
// TODO match abort_ret {..}
println!("abort signalled, handle_incoming returning");
return;
},
bytes = conn.read(&mut read_buf).fuse() => {
match bytes {
Err(io_err) => {
println!("io error when reading input stream: {:?}", io_err);
return;
}
Ok(bytes) => {
println!("read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
}
}
}
}
}
}
async fn handle_outgoing(
conn: TcpStreamWriteHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
println!("handle_outgoing");
let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
println!("abort signalled, handle_outgoing returning");
return;
}
input = stdin.read(&mut read_buf).fuse() => {
match input {
Err(io_err) => {
println!("io error when reading stdin: {:?}", io_err);
return;
}
Ok(bytes) => {
println!("handle_outgoing read {} bytes", bytes);
// TODO
}
}
},
}
}
}
问题:
- 我的任务生成对吗?我可以安全地在 main 中执行
tokio::spawn
吗? 任务传递给executor.spawn()
? - 我在这个程序中读取标准输入的方式有什么问题?
谢谢
Tokio stdin
阻塞了来自执行器池的封闭线程,因为它被来自 tokio-executor 的 blocking
注释。 From the reference :
When the
blocking
function enters, it hands off the responsibility of processing the current work queue to another thread.
您的代码无法正常工作,因为您使用的执行程序在单个线程中多路复用任务 (tokio::runtime::current_thread::Runtime::new()
)。因此将没有其他线程为执行者执行其他任务。
如果您正确配置运行时(具有多个线程的线程池),您的代码将正常工作:
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut executor = rt.executor();
executor.spawn(async {
match server_task().await {
Ok(()) => {}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
rt.shutdown_on_idle();
}
另请参阅: How can I stop reading from a tokio::io::lines stream?