"blocking annotated I/O must be called from the context of the Tokio runtime" 在异步任务中读取标准输入时

"blocking annotated I/O must be called from the context of the Tokio runtime" when reading stdin in async task

我正在尝试从 tokio::spawn 生成的异步任务中的标准输入读取数据。这 执行者被装箱为

let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

然后主要任务是 运行 和 executor.task(...),这会产生其他任务 tokio::spawn().

fn main 然后调用 executor.run().unwrap(); 等待所有任务完成。

问题是我什么时候做

let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
...
stdin.read(&mut read_buf).await

我收到 "blocking annotated I/O must be called from the context of the Tokio runtime" 错误。

依赖关系:

futures-preview = { version = "0.3.0-alpha.18",  features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"

完整代码:

extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;

use std::io::Write;
use std::net::SocketAddr;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpListener;
use tokio_sync::oneshot;

use futures::select;

use futures::future::FutureExt;

#[derive(Debug)]
enum AppErr {
    CantBindAddr(std::io::Error),
    CantAccept(std::io::Error),
}

fn main() {
    let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    executor.run().unwrap(); // ignores RunError
}

async fn server_task() -> Result<(), AppErr> {
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    let mut listener = TcpListener::bind(&addr).map_err(AppErr::CantBindAddr)?;

    loop {
        print!("Waiting for incoming connection...");
        let _ = std::io::stdout().flush();
        let (socket, _) = listener.accept().await.map_err(AppErr::CantAccept)?;
        println!("{:?} connected.", socket);
        let (read, write) = socket.split();

        let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
        let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();

        tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
        tokio::spawn(handle_outgoing(
            write,
            abort_out_task_rcv,
            abort_in_task_snd,
        ));
    }
}

async fn handle_incoming(
    mut conn: TcpStreamReadHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_incoming");

    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                // TODO match abort_ret {..}
                println!("abort signalled, handle_incoming returning");
                return;
            },
            bytes = conn.read(&mut read_buf).fuse() => {
                match bytes {
                    Err(io_err) => {
                        println!("io error when reading input stream: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
                    }
                }
            }
        }
    }
}

async fn handle_outgoing(
    conn: TcpStreamWriteHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_outgoing");

    let mut stdin = tokio::io::stdin();
    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                println!("abort signalled, handle_outgoing returning");
                return;
            }
            input = stdin.read(&mut read_buf).fuse() => {
                match input {
                    Err(io_err) => {
                        println!("io error when reading stdin: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("handle_outgoing read {} bytes", bytes);
                        // TODO
                    }
                }
            },
        }
    }
}

问题:

谢谢

Tokio stdin 阻塞了来自执行器池的封闭线程,因为它被来自 tokio-executor 的 blocking 注释。 From the reference :

When the blocking function enters, it hands off the responsibility of processing the current work queue to another thread.

您的代码无法正常工作,因为您使用的执行程序在单个线程中多路复用任务 (tokio::runtime::current_thread::Runtime::new())。因此将没有其他线程为执行者执行其他任务。

如果您正确配置运行时(具有多个线程的线程池),您的代码将正常工作:

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let mut executor = rt.executor();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    rt.shutdown_on_idle();
}

另请参阅: How can I stop reading from a tokio::io::lines stream?