如何在不阻塞 Rust 的情况下读取子进程的输出?
How do I read the output of a child process without blocking in Rust?
我正在用 Rust 制作一个需要与子进程通信的小型 ncurses 应用程序。我已经有了一个用 Common Lisp 编写的原型。我正在尝试重写它,因为 CL 为这么小的工具使用了大量内存。
我在弄清楚如何与子流程交互时遇到了一些问题。
我目前做的大致是这样的:
创建进程:
let mut program = match Command::new(command)
.args(arguments)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(_) => {
println!("Cannot run program '{}'.", command);
return;
}
};
将它传递给无限(直到用户退出)循环,它读取和处理输入并侦听这样的输出(并将其写入屏幕):
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout {
Some(ref mut out) => {
let mut buf_string = String::new();
match out.read_to_string(&mut buf_string) {
Ok(_) => output_viewer.append_string(buf_string),
Err(_) => return,
};
}
None => return,
};
}
然而,对 read_to_string
的调用会阻塞程序,直到进程退出。据我所见,read_to_end
和 read
似乎也被阻止了。如果我尝试 运行 类似 ls
的东西会立即退出,它会起作用,但是对于像 python
或 sbcl
这样不会退出的东西,它只会在我杀死手动子处理。
基于,我将代码更改为使用BufReader
:
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout.as_mut() {
Some(out) => {
let buf_reader = BufReader::new(out);
for line in buf_reader.lines() {
match line {
Ok(l) => {
output_viewer.append_string(l);
}
Err(_) => return,
};
}
}
None => return,
}
}
但是,问题依旧。它将读取所有可用的行,然后阻塞。由于该工具应该适用于任何程序,因此在尝试读取之前无法猜测输出何时结束。似乎也没有办法为 BufReader
设置超时。
流默认是阻塞。 TCP/IP 流、文件系统流、管道流,它们都是阻塞的。当你告诉流给你一大块字节时,它会停止并等待直到它具有给定的字节数或直到发生其他事情(interrupt,流结束,错误)。
操作系统急于return数据到读取进程,所以如果你只想等待下一行并在它进来后立即处理它那么Shepmaster建议的方法在 (以及他在这里的回答)中有效。
虽然理论上它不一定要工作,因为允许操作系统让 BufReader
等待 read
中的更多数据,但实际上操作系统更喜欢早期的“短读”而不是等待。
当您需要处理多个流时(例如 child 进程的 stdout
和 stderr
)或,这种基于 BufReader
的简单方法变得更加危险多个进程。例如,当 child 进程等待您排空其 stderr
管道时,基于 BufReader
的方法可能会死锁,而您的进程因等待它为空 stdout
.
同样,当您不希望您的程序无限期地等待 child 进程时,您不能使用 BufReader
。也许您想在 child 仍在工作且没有输出时显示进度条或计时器。
如果您的操作系统恰好不急于return将数据传输到进程(更喜欢“完整读取”而不是“短读取”),则您不能使用基于 BufReader
的方法) 因为在那种情况下,child 进程打印的最后几行可能会在灰色区域结束:操作系统得到了它们,但它们的大小不足以填充 BufReader
的缓冲区.
BufReader
仅限于 Read
接口允许它对流执行的操作,它的阻塞程度不亚于底层流。为了提高效率,它将 read 分块输入,告诉操作系统尽可能多地填充可用的缓冲区。
您可能想知道为什么在这里以块的形式读取数据如此重要,为什么 BufReader
不能逐字节读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们独立于它工作,以免在我们的进程出现问题时干扰它。因此,为了调用操作系统,需要转换到“内核模式”,这也可能会导致“上下文切换”。这就是为什么调用操作系统读取每个字节的代价很高。我们想要尽可能少的 OS 调用,所以我们分批获取流数据。
要在不阻塞的情况下等待流,您需要 non-blocking 流。 MIO promises to have the required non-blocking stream support for pipes, most probably with PipeReader,不过我还没看呢
流的 non-blocking 性质应该可以分块读取数据,而不管操作系统是否喜欢“短读取”。因为 non-blocking 流从不阻塞。如果流中没有数据,它会简单地告诉您。
在缺少 non-blocking 流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞您的主线程。您可能还想逐字节读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符做出反应。这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.
P.S。这是一个允许通过共享字节向量监视程序标准输出的函数示例:
use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
R: Read + Send + 'static,
{
let out = Arc::new(Mutex::new(Vec::new()));
let vec = out.clone();
thread::Builder::new()
.name("child_stream_to_vec".into())
.spawn(move || loop {
let mut buf = [0];
match stream.read(&mut buf) {
Err(err) => {
println!("{}] Error reading from stream: {}", line!(), err);
break;
}
Ok(got) => {
if got == 0 {
break;
} else if got == 1 {
vec.lock().expect("!lock").push(buf[0])
} else {
println!("{}] Unexpected number of bytes: {}", line!(), got);
break;
}
}
}
})
.expect("!thread");
out
}
fn main() {
let mut cat = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("!cat");
let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
let mut stdin = match cat.stdin.take() {
Some(stdin) => stdin,
None => panic!("!stdin"),
};
}
我用它来控制 SSH 会话:
try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
P.S。请注意 read call in async-std is blocking as well. It's just instead of blocking a system thread it only blocks a chain of futures (a stack-less green thread essentially). The poll_read is the non-blocking interface. In async-std#499 上的 await
我问过开发人员这些 API 是否有短读保证。
P.S。可能有a similar concern in Nom:“我们会告诉IO端根据解析器的结果(不完整或不完整)重新填充”
P.S。看看流式阅读是如何在 crossterm 中实现的可能会很有趣。对于 Windows,在 poll.rs, they are using the native WaitForMultipleObjects. In unix.rs 他们正在使用 mio poll
。
东京的Command
这里是一个使用tokio0.2的例子:
use std::process::Stdio;
use futures::StreamExt; // 0.3.1
use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]
#[tokio::main]
async fn main() {
let mut cmd = Command::new("/tmp/slow.bash")
.stdout(Stdio::piped()) // Can do the same for stderr
.spawn()
.expect("cannot spawn");
let stdout = cmd.stdout().take().expect("no stdout");
// Can do the same for stderr
// To print out each line
// BufReader::new(stdout)
// .lines()
// .for_each(|s| async move { println!("> {:?}", s) })
// .await;
// To print out each line *and* collect it all into a Vec
let result: Vec<_> = BufReader::new(stdout)
.lines()
.inspect(|s| println!("> {:?}", s))
.collect()
.await;
println!("All the lines: {:?}", result);
}
Tokio-Threadpool
这里是一个使用tokio0.1和tokio-threadpool的例子。我们使用 blocking
函数在线程中启动进程。我们将其转换为 stream::poll_fn
的流
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn stream_command_output(
mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
// Ensure that the output is available to read from and start the process
let mut child = command
.stdout(Stdio::piped())
.spawn()
.expect("cannot spawn");
let mut stdout = child.stdout.take().expect("no stdout");
// Create a stream of data
stream::poll_fn(move || {
// Perform blocking IO
tokio_threadpool::blocking(|| {
// Allocate some space to store anything read
let mut data = vec![0; 128];
// Read 1-128 bytes of data
let n_bytes_read = stdout.read(&mut data).expect("cannot read");
if n_bytes_read == 0 {
// Stdout is done
None
} else {
// Only return as many bytes as we read
data.truncate(n_bytes_read);
Some(data)
}
})
})
}
fn main() {
let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
output_stream
.map(|d| String::from_utf8(d).expect("Not UTF-8"))
.fold(Vec::new(), |mut v, s| {
print!("> {}", s);
v.push(s);
Ok(v)
})
});
println!("All the lines: {:?}", result);
}
这里可以做出许多可能的权衡。例如,总是分配 128 个字节并不理想,但实现起来很简单。
支持
供参考,这里是slow.bash:
#!/usr/bin/env bash
set -eu
val=0
while [[ $val -lt 10 ]]; do
echo $val
val=$(($val + 1))
sleep 1
done
另请参阅:
如果 Unix 支持足够,您还可以使两个输出流成为非阻塞的并轮询它们,就像在 TcpStream
上使用 set_nonblocking
函数那样。
Command spawn返回的ChildStdout
和ChildStderr
是Stdio
(并且包含一个文件描述符),可以直接修改这些handle的读行为使其非-阻塞。
基于jcreekmore/timeout-readwrite-rs and anowell/nonblock-rs的工作,我使用这个包装器来修改流句柄:
extern crate libc;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};
fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
where
H: Read + AsRawFd,
{
let fd = handle.as_raw_fd();
let flags = unsafe { fcntl(fd, F_GETFL, 0) };
if flags < 0 {
return Err(std::io::Error::last_os_error());
}
let flags = if nonblocking{
flags | O_NONBLOCK
} else {
flags & !O_NONBLOCK
};
let res = unsafe { fcntl(fd, F_SETFL, flags) };
if res != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
您可以像管理任何其他非阻塞流一样管理这两个流。下面的例子是基于 polling crate,这使得读取事件和 BufReader
用于行读取变得非常容易:
use std::process::{Command, Stdio};
use std::path::PathBuf;
use std::io::{BufReader, BufRead};
use std::thread;
extern crate polling;
use polling::{Event, Poller};
fn main() -> Result<(), std::io::Error> {
let path = PathBuf::from("./worker.sh").canonicalize()?;
let mut child = Command::new(path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start worker");
let handle = thread::spawn({
let stdout = child.stdout.take().unwrap();
set_nonblocking(&stdout, true)?;
let mut reader_out = BufReader::new(stdout);
let stderr = child.stderr.take().unwrap();
set_nonblocking(&stderr, true)?;
let mut reader_err = BufReader::new(stderr);
move || {
let key_out = 1;
let key_err = 2;
let mut out_closed = false;
let mut err_closed = false;
let poller = Poller::new().unwrap();
poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();
let mut line = String::new();
let mut events = Vec::new();
loop {
// Wait for at least one I/O event.
events.clear();
poller.wait(&mut events, None).unwrap();
for ev in &events {
// stdout is ready for reading
if ev.key == key_out {
let len = match reader_out.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stdout read returned error: {}", e);
0
}
};
if len == 0 {
println!("stdout closed (len is null)");
out_closed = true;
poller.delete(reader_out.get_ref()).unwrap();
} else {
print!("[STDOUT] {}", line);
line.clear();
// reload the poller
poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
}
}
// stderr is ready for reading
if ev.key == key_err {
let len = match reader_err.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stderr read returned error: {}", e);
0
}
};
if len == 0 {
println!("stderr closed (len is null)");
err_closed = true;
poller.delete(reader_err.get_ref()).unwrap();
} else {
print!("[STDERR] {}", line);
line.clear();
// reload the poller
poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
}
}
}
if out_closed && err_closed {
println!("Stream closed, exiting process thread");
break;
}
}
}
});
handle.join().unwrap();
Ok(())
}
此外,与 EventFd 上的包装器一起使用,可以轻松地从另一个线程停止进程,而不会阻塞,也不会主动轮询,并且只使用一个线程。
编辑: 在我的测试之后,轮询箱似乎自动将轮询句柄设置为非阻塞模式。如果您想直接使用 nix::poll 对象,set_nonblocking 函数仍然有用。
我遇到了足够多的用例,在这些用例中,通过行分隔文本与子进程交互非常有用,我为此编写了一个箱子,interactive_process。
我希望原来的问题早就解决了,但我认为这可能对其他人有所帮助。
我正在用 Rust 制作一个需要与子进程通信的小型 ncurses 应用程序。我已经有了一个用 Common Lisp 编写的原型。我正在尝试重写它,因为 CL 为这么小的工具使用了大量内存。
我在弄清楚如何与子流程交互时遇到了一些问题。
我目前做的大致是这样的:
创建进程:
let mut program = match Command::new(command) .args(arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(child) => child, Err(_) => { println!("Cannot run program '{}'.", command); return; } };
将它传递给无限(直到用户退出)循环,它读取和处理输入并侦听这样的输出(并将其写入屏幕):
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout { Some(ref mut out) => { let mut buf_string = String::new(); match out.read_to_string(&mut buf_string) { Ok(_) => output_viewer.append_string(buf_string), Err(_) => return, }; } None => return, }; }
然而,对 read_to_string
的调用会阻塞程序,直到进程退出。据我所见,read_to_end
和 read
似乎也被阻止了。如果我尝试 运行 类似 ls
的东西会立即退出,它会起作用,但是对于像 python
或 sbcl
这样不会退出的东西,它只会在我杀死手动子处理。
基于BufReader
:
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout.as_mut() {
Some(out) => {
let buf_reader = BufReader::new(out);
for line in buf_reader.lines() {
match line {
Ok(l) => {
output_viewer.append_string(l);
}
Err(_) => return,
};
}
}
None => return,
}
}
但是,问题依旧。它将读取所有可用的行,然后阻塞。由于该工具应该适用于任何程序,因此在尝试读取之前无法猜测输出何时结束。似乎也没有办法为 BufReader
设置超时。
流默认是阻塞。 TCP/IP 流、文件系统流、管道流,它们都是阻塞的。当你告诉流给你一大块字节时,它会停止并等待直到它具有给定的字节数或直到发生其他事情(interrupt,流结束,错误)。
操作系统急于return数据到读取进程,所以如果你只想等待下一行并在它进来后立即处理它那么Shepmaster建议的方法在
虽然理论上它不一定要工作,因为允许操作系统让 BufReader
等待 read
中的更多数据,但实际上操作系统更喜欢早期的“短读”而不是等待。
当您需要处理多个流时(例如 child 进程的 stdout
和 stderr
)或,这种基于 BufReader
的简单方法变得更加危险多个进程。例如,当 child 进程等待您排空其 stderr
管道时,基于 BufReader
的方法可能会死锁,而您的进程因等待它为空 stdout
.
同样,当您不希望您的程序无限期地等待 child 进程时,您不能使用 BufReader
。也许您想在 child 仍在工作且没有输出时显示进度条或计时器。
如果您的操作系统恰好不急于return将数据传输到进程(更喜欢“完整读取”而不是“短读取”),则您不能使用基于 BufReader
的方法) 因为在那种情况下,child 进程打印的最后几行可能会在灰色区域结束:操作系统得到了它们,但它们的大小不足以填充 BufReader
的缓冲区.
BufReader
仅限于 Read
接口允许它对流执行的操作,它的阻塞程度不亚于底层流。为了提高效率,它将 read 分块输入,告诉操作系统尽可能多地填充可用的缓冲区。
您可能想知道为什么在这里以块的形式读取数据如此重要,为什么 BufReader
不能逐字节读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们独立于它工作,以免在我们的进程出现问题时干扰它。因此,为了调用操作系统,需要转换到“内核模式”,这也可能会导致“上下文切换”。这就是为什么调用操作系统读取每个字节的代价很高。我们想要尽可能少的 OS 调用,所以我们分批获取流数据。
要在不阻塞的情况下等待流,您需要 non-blocking 流。 MIO promises to have the required non-blocking stream support for pipes, most probably with PipeReader,不过我还没看呢
流的 non-blocking 性质应该可以分块读取数据,而不管操作系统是否喜欢“短读取”。因为 non-blocking 流从不阻塞。如果流中没有数据,它会简单地告诉您。
在缺少 non-blocking 流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞您的主线程。您可能还想逐字节读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符做出反应。这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.
P.S。这是一个允许通过共享字节向量监视程序标准输出的函数示例:
use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
R: Read + Send + 'static,
{
let out = Arc::new(Mutex::new(Vec::new()));
let vec = out.clone();
thread::Builder::new()
.name("child_stream_to_vec".into())
.spawn(move || loop {
let mut buf = [0];
match stream.read(&mut buf) {
Err(err) => {
println!("{}] Error reading from stream: {}", line!(), err);
break;
}
Ok(got) => {
if got == 0 {
break;
} else if got == 1 {
vec.lock().expect("!lock").push(buf[0])
} else {
println!("{}] Unexpected number of bytes: {}", line!(), got);
break;
}
}
}
})
.expect("!thread");
out
}
fn main() {
let mut cat = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("!cat");
let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
let mut stdin = match cat.stdin.take() {
Some(stdin) => stdin,
None => panic!("!stdin"),
};
}
我用它来控制 SSH 会话:
try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
P.S。请注意 read call in async-std is blocking as well. It's just instead of blocking a system thread it only blocks a chain of futures (a stack-less green thread essentially). The poll_read is the non-blocking interface. In async-std#499 上的 await
我问过开发人员这些 API 是否有短读保证。
P.S。可能有a similar concern in Nom:“我们会告诉IO端根据解析器的结果(不完整或不完整)重新填充”
P.S。看看流式阅读是如何在 crossterm 中实现的可能会很有趣。对于 Windows,在 poll.rs, they are using the native WaitForMultipleObjects. In unix.rs 他们正在使用 mio poll
。
东京的Command
这里是一个使用tokio0.2的例子:
use std::process::Stdio;
use futures::StreamExt; // 0.3.1
use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]
#[tokio::main]
async fn main() {
let mut cmd = Command::new("/tmp/slow.bash")
.stdout(Stdio::piped()) // Can do the same for stderr
.spawn()
.expect("cannot spawn");
let stdout = cmd.stdout().take().expect("no stdout");
// Can do the same for stderr
// To print out each line
// BufReader::new(stdout)
// .lines()
// .for_each(|s| async move { println!("> {:?}", s) })
// .await;
// To print out each line *and* collect it all into a Vec
let result: Vec<_> = BufReader::new(stdout)
.lines()
.inspect(|s| println!("> {:?}", s))
.collect()
.await;
println!("All the lines: {:?}", result);
}
Tokio-Threadpool
这里是一个使用tokio0.1和tokio-threadpool的例子。我们使用 blocking
函数在线程中启动进程。我们将其转换为 stream::poll_fn
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn stream_command_output(
mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
// Ensure that the output is available to read from and start the process
let mut child = command
.stdout(Stdio::piped())
.spawn()
.expect("cannot spawn");
let mut stdout = child.stdout.take().expect("no stdout");
// Create a stream of data
stream::poll_fn(move || {
// Perform blocking IO
tokio_threadpool::blocking(|| {
// Allocate some space to store anything read
let mut data = vec![0; 128];
// Read 1-128 bytes of data
let n_bytes_read = stdout.read(&mut data).expect("cannot read");
if n_bytes_read == 0 {
// Stdout is done
None
} else {
// Only return as many bytes as we read
data.truncate(n_bytes_read);
Some(data)
}
})
})
}
fn main() {
let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
output_stream
.map(|d| String::from_utf8(d).expect("Not UTF-8"))
.fold(Vec::new(), |mut v, s| {
print!("> {}", s);
v.push(s);
Ok(v)
})
});
println!("All the lines: {:?}", result);
}
这里可以做出许多可能的权衡。例如,总是分配 128 个字节并不理想,但实现起来很简单。
支持
供参考,这里是slow.bash:
#!/usr/bin/env bash
set -eu
val=0
while [[ $val -lt 10 ]]; do
echo $val
val=$(($val + 1))
sleep 1
done
另请参阅:
如果 Unix 支持足够,您还可以使两个输出流成为非阻塞的并轮询它们,就像在 TcpStream
上使用 set_nonblocking
函数那样。
Command spawn返回的ChildStdout
和ChildStderr
是Stdio
(并且包含一个文件描述符),可以直接修改这些handle的读行为使其非-阻塞。
基于jcreekmore/timeout-readwrite-rs and anowell/nonblock-rs的工作,我使用这个包装器来修改流句柄:
extern crate libc;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};
fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
where
H: Read + AsRawFd,
{
let fd = handle.as_raw_fd();
let flags = unsafe { fcntl(fd, F_GETFL, 0) };
if flags < 0 {
return Err(std::io::Error::last_os_error());
}
let flags = if nonblocking{
flags | O_NONBLOCK
} else {
flags & !O_NONBLOCK
};
let res = unsafe { fcntl(fd, F_SETFL, flags) };
if res != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
您可以像管理任何其他非阻塞流一样管理这两个流。下面的例子是基于 polling crate,这使得读取事件和 BufReader
用于行读取变得非常容易:
use std::process::{Command, Stdio};
use std::path::PathBuf;
use std::io::{BufReader, BufRead};
use std::thread;
extern crate polling;
use polling::{Event, Poller};
fn main() -> Result<(), std::io::Error> {
let path = PathBuf::from("./worker.sh").canonicalize()?;
let mut child = Command::new(path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start worker");
let handle = thread::spawn({
let stdout = child.stdout.take().unwrap();
set_nonblocking(&stdout, true)?;
let mut reader_out = BufReader::new(stdout);
let stderr = child.stderr.take().unwrap();
set_nonblocking(&stderr, true)?;
let mut reader_err = BufReader::new(stderr);
move || {
let key_out = 1;
let key_err = 2;
let mut out_closed = false;
let mut err_closed = false;
let poller = Poller::new().unwrap();
poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();
let mut line = String::new();
let mut events = Vec::new();
loop {
// Wait for at least one I/O event.
events.clear();
poller.wait(&mut events, None).unwrap();
for ev in &events {
// stdout is ready for reading
if ev.key == key_out {
let len = match reader_out.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stdout read returned error: {}", e);
0
}
};
if len == 0 {
println!("stdout closed (len is null)");
out_closed = true;
poller.delete(reader_out.get_ref()).unwrap();
} else {
print!("[STDOUT] {}", line);
line.clear();
// reload the poller
poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
}
}
// stderr is ready for reading
if ev.key == key_err {
let len = match reader_err.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stderr read returned error: {}", e);
0
}
};
if len == 0 {
println!("stderr closed (len is null)");
err_closed = true;
poller.delete(reader_err.get_ref()).unwrap();
} else {
print!("[STDERR] {}", line);
line.clear();
// reload the poller
poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
}
}
}
if out_closed && err_closed {
println!("Stream closed, exiting process thread");
break;
}
}
}
});
handle.join().unwrap();
Ok(())
}
此外,与 EventFd 上的包装器一起使用,可以轻松地从另一个线程停止进程,而不会阻塞,也不会主动轮询,并且只使用一个线程。
编辑: 在我的测试之后,轮询箱似乎自动将轮询句柄设置为非阻塞模式。如果您想直接使用 nix::poll 对象,set_nonblocking 函数仍然有用。
我遇到了足够多的用例,在这些用例中,通过行分隔文本与子进程交互非常有用,我为此编写了一个箱子,interactive_process。
我希望原来的问题早就解决了,但我认为这可能对其他人有所帮助。