如何在不阻塞 Rust 的情况下读取子进程的输出?

How do I read the output of a child process without blocking in Rust?

我正在用 Rust 制作一个需要与子进程通信的小型 ncurses 应用程序。我已经有了一个用 Common Lisp 编写的原型。我正在尝试重写它,因为 CL 为这么小的工具使用了大量内存。

我在弄清楚如何与子流程交互时遇到了一些问题。

我目前做的大致是这样的:

  1. 创建进程:

    let mut program = match Command::new(command)
        .args(arguments)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(child) => child,
        Err(_) => {
            println!("Cannot run program '{}'.", command);
            return;
        }
    };
    
  2. 将它传递给无限(直到用户退出)循环,它读取和处理输入并侦听这样的输出(并将其写入屏幕):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }
    

然而,对 read_to_string 的调用会阻塞程序,直到进程退出。据我所见,read_to_endread 似乎也被阻止了。如果我尝试 运行 类似 ls 的东西会立即退出,它会起作用,但是对于像 pythonsbcl 这样不会退出的东西,它只会在我杀死手动子处理。

基于,我将代码更改为使用BufReader:

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }

但是,问题依旧。它将读取所有可用的行,然后阻塞。由于该工具应该适用于任何程序,因此在尝试读取之前无法猜测输出何时结束。似乎也没有办法为 BufReader 设置超时。

流默认是阻塞。 TCP/IP 流、文件系统流、管道流,它们都是阻塞的。当你告诉流给你一大块字节时,它会停止并等待直到它具有给定的字节数或直到发生其他事情(interrupt,流结束,错误)。

操作系统急于return数据到读取进程,所以如果你只想等待下一行并在它进来后立即处理它那么Shepmaster建议的方法在 (以及他在这里的回答)中有效。
虽然理论上它不一定要工作,因为允许操作系统让 BufReader 等待 read 中的更多数据,但实际上操作系统更喜欢早期的“短读”而不是等待。

当您需要处理多个流时(例如 child 进程的 stdoutstderr)或,这种基于 BufReader 的简单方法变得更加危险多个进程。例如,当 child 进程等待您排空其 stderr 管道时,基于 BufReader 的方法可能会死锁,而您的进程因等待它为空 stdout.

同样,当您不希望您的程序无限期地等待 child 进程时,您不能使用 BufReader。也许您想在 child 仍在工作且没有输出时显示进度条或计时器。

如果您的操作系统恰好不急于return将数据传输到进程(更喜欢“完整读取”而不是“短读取”),则您不能使用基于 BufReader 的方法) 因为在那种情况下,child 进程打印的最后几行可能会在灰色区域结束:操作系统得到了它们,但它们的大小不足以填充 BufReader 的缓冲区.

BufReader 仅限于 Read 接口允许它对流执行的操作,它的阻塞程度不亚于底层流。为了提高效率,它将 read 分块输入,告诉操作系统尽可能多地填充可用的缓冲区。

您可能想知道为什么在这里以块的形式读取数据如此重要,为什么 BufReader 不能逐字节读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们独立于它工作,以免在我们的进程出现问题时干扰它。因此,为了调用操作系统,需要转换到“内核模式”,这也可能会导致“上下文切换”。这就是为什么调用操作系统读取每个字节的代价很高。我们想要尽可能少的 OS 调用,所以我们分批获取流数据。

要在不阻塞的情况下等待流,您需要 non-blocking 流。 MIO promises to have the required non-blocking stream support for pipes, most probably with PipeReader,不过我还没看呢

流的 non-blocking 性质应该可以分块读取数据,而不管操作系统是否喜欢“短读取”。因为 non-blocking 流从不阻塞。如果流中没有数据,它会简单地告诉您。

在缺少 non-blocking 流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞您的主线程。您可能还想逐字节读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符做出反应。这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.

P.S。这是一个允许通过共享字节向量监视程序标准输出的函数示例:

use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;

/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
    R: Read + Send + 'static,
{
    let out = Arc::new(Mutex::new(Vec::new()));
    let vec = out.clone();
    thread::Builder::new()
        .name("child_stream_to_vec".into())
        .spawn(move || loop {
            let mut buf = [0];
            match stream.read(&mut buf) {
                Err(err) => {
                    println!("{}] Error reading from stream: {}", line!(), err);
                    break;
                }
                Ok(got) => {
                    if got == 0 {
                        break;
                    } else if got == 1 {
                        vec.lock().expect("!lock").push(buf[0])
                    } else {
                        println!("{}] Unexpected number of bytes: {}", line!(), got);
                        break;
                    }
                }
            }
        })
        .expect("!thread");
    out
}

fn main() {
    let mut cat = Command::new("cat")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("!cat");

    let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
    let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
    let mut stdin = match cat.stdin.take() {
        Some(stdin) => stdin,
        None => panic!("!stdin"),
    };
}

我用它来控制 SSH 会话:

try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));

P.S。请注意 read call in async-std is blocking as well. It's just instead of blocking a system thread it only blocks a chain of futures (a stack-less green thread essentially). The poll_read is the non-blocking interface. In async-std#499 上的 await 我问过开发人员这些 API 是否有短读保证。

P.S。可能有a similar concern in Nom:“我们会告诉IO端根据解析器的结果(不完整或不完整)重新填充

P.S。看看流式阅读是如何在 crossterm 中实现的可能会很有趣。对于 Windows,在 poll.rs, they are using the native WaitForMultipleObjects. In unix.rs 他们正在使用 mio poll

东京的Command

这里是一个使用tokio0.2的例子:

use std::process::Stdio;
use futures::StreamExt; // 0.3.1
use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]

#[tokio::main]
async fn main() {
    let mut cmd = Command::new("/tmp/slow.bash")
        .stdout(Stdio::piped()) // Can do the same for stderr
        .spawn()
        .expect("cannot spawn");

    let stdout = cmd.stdout().take().expect("no stdout");
    // Can do the same for stderr

    // To print out each line
    // BufReader::new(stdout)
    //     .lines()
    //     .for_each(|s| async move { println!("> {:?}", s) })
    //     .await;

    // To print out each line *and* collect it all into a Vec
    let result: Vec<_> = BufReader::new(stdout)
        .lines()
        .inspect(|s| println!("> {:?}", s))
        .collect()
        .await;

    println!("All the lines: {:?}", result);
}

Tokio-Threadpool

这里是一个使用tokio0.1和tokio-threadpool的例子。我们使用 blocking 函数在线程中启动进程。我们将其转换为 stream::poll_fn

的流
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13

fn stream_command_output(
    mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
    // Ensure that the output is available to read from and start the process
    let mut child = command
        .stdout(Stdio::piped())
        .spawn()
        .expect("cannot spawn");
    let mut stdout = child.stdout.take().expect("no stdout");

    // Create a stream of data
    stream::poll_fn(move || {
        // Perform blocking IO
        tokio_threadpool::blocking(|| {
            // Allocate some space to store anything read
            let mut data = vec![0; 128];
            // Read 1-128 bytes of data
            let n_bytes_read = stdout.read(&mut data).expect("cannot read");

            if n_bytes_read == 0 {
                // Stdout is done
                None
            } else {
                // Only return as many bytes as we read
                data.truncate(n_bytes_read);
                Some(data)
            }
        })
    })
}

fn main() {
    let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));

    let mut runtime = Runtime::new().expect("Unable to start the runtime");

    let result = runtime.block_on({
        output_stream
            .map(|d| String::from_utf8(d).expect("Not UTF-8"))
            .fold(Vec::new(), |mut v, s| {
                print!("> {}", s);
                v.push(s);
                Ok(v)
            })
    });

    println!("All the lines: {:?}", result);
}

这里可以做出许多可能的权衡。例如,总是分配 128 个字节并不理想,但实现起来很简单。

支持

供参考,这里是slow.bash:

#!/usr/bin/env bash

set -eu

val=0

while [[ $val -lt 10 ]]; do
    echo $val
    val=$(($val + 1))
    sleep 1
done

另请参阅:

如果 Unix 支持足够,您还可以使两个输出流成为非阻塞的并轮询它们,就像在 TcpStream 上使用 set_nonblocking 函数那样。

Command spawn返回的ChildStdoutChildStderrStdio(并且包含一个文件描述符),可以直接修改这些handle的读行为使其非-阻塞。

基于jcreekmore/timeout-readwrite-rs and anowell/nonblock-rs的工作,我使用这个包装器来修改流句柄:

extern crate libc;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};

fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
where
    H: Read + AsRawFd,
{
    let fd = handle.as_raw_fd();
    let flags = unsafe { fcntl(fd, F_GETFL, 0) };
    if flags < 0 {
        return Err(std::io::Error::last_os_error());
    }
    let flags = if nonblocking{
        flags | O_NONBLOCK
    } else {
        flags & !O_NONBLOCK
    };
    let res = unsafe { fcntl(fd, F_SETFL, flags) };
    if res != 0 {
        return Err(std::io::Error::last_os_error());
    }
    Ok(())
}

您可以像管理任何其他非阻塞流一样管理这两个流。下面的例子是基于 polling crate,这使得读取事件和 BufReader 用于行读取变得非常容易:

use std::process::{Command, Stdio};
use std::path::PathBuf;
use std::io::{BufReader, BufRead};
use std::thread;
extern crate polling;
use polling::{Event, Poller};

fn main() -> Result<(), std::io::Error> {
    let path = PathBuf::from("./worker.sh").canonicalize()?;

    let mut child = Command::new(path)
        .stdin(Stdio::null())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("Failed to start worker");

    let handle = thread::spawn({
        let stdout = child.stdout.take().unwrap();
        set_nonblocking(&stdout, true)?;
        let mut reader_out = BufReader::new(stdout);

        let stderr = child.stderr.take().unwrap();
        set_nonblocking(&stderr, true)?;
        let mut reader_err = BufReader::new(stderr);

        move || {
            let key_out = 1;
            let key_err = 2;
            let mut out_closed = false;
            let mut err_closed = false;

            let poller = Poller::new().unwrap();
            poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
            poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();

            let mut line = String::new();
            let mut events = Vec::new();
            loop {
                // Wait for at least one I/O event.
                events.clear();
                poller.wait(&mut events, None).unwrap();

                for ev in &events {
                    // stdout is ready for reading
                    if ev.key == key_out {
                        let len = match reader_out.read_line(&mut line) {
                            Ok(len) => len,
                            Err(e) => {
                                println!("stdout read returned error: {}", e);
                                0
                            }
                        };
                        if len == 0 {
                            println!("stdout closed (len is null)");
                            out_closed = true;
                            poller.delete(reader_out.get_ref()).unwrap();
                        } else {
                            print!("[STDOUT] {}", line);
                            line.clear();
                            // reload the poller
                            poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                        }
                    }

                    // stderr is ready for reading
                    if ev.key == key_err {
                        let len = match reader_err.read_line(&mut line) {
                            Ok(len) => len,
                            Err(e) => {
                                println!("stderr read returned error: {}", e);
                                0
                            }
                        };
                        if len == 0 {
                            println!("stderr closed (len is null)");
                            err_closed = true;
                            poller.delete(reader_err.get_ref()).unwrap();
                        } else {
                            print!("[STDERR] {}", line);
                            line.clear();
                            // reload the poller
                            poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
                        }
                    }
                }

                if out_closed && err_closed {
                    println!("Stream closed, exiting process thread");
                    break;
                }
            }
        }
    });

    handle.join().unwrap();
    Ok(())
}

此外,与 EventFd 上的包装器一起使用,可以轻松地从另一个线程停止进程,而不会阻塞,也不会主动轮询,并且只使用一个线程。

编辑: 在我的测试之后,轮询箱似乎自动将轮询句柄设置为非阻塞模式。如果您想直接使用 nix::poll 对象,set_nonblocking 函数仍然有用。

我遇到了足够多的用例,在这些用例中,通过行分隔文本与子进程交互非常有用,我为此编写了一个箱子,interactive_process

我希望原来的问题早就解决了,但我认为这可能对其他人有所帮助。