从 exec.Command 逐行捕获标准输出并通过管道传输到 os.Stdout
Capture stdout from exec.Command line by line and also pipe to os.Stdout
有人能帮忙吗?
我有一个应用程序我是 运行 通过 exec.CommandContext (所以我可以通过 ctx 取消它)。除非出错,否则它通常不会停止。
我目前将它的输出中继到 os.stdOut,效果很好。但我也想通过一个通道获取每一行——这背后的想法是我将在该行中寻找一个正则表达式,如果它是真的那么我将设置一个内部状态“错误”。
虽然我无法让它工作,但我尝试了 NewSscanner。这是我的代码。
正如我所说,它确实输出到 os.StdOut,这很好,但我希望收到我设置的频道中发生的每一行。
有什么想法吗?
提前致谢。
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go func() {
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)
lines := make(chan string)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
select {
case outputx := <-lines:
// I will do somethign with this!
fmt.Println("Hello!!", outputx)
case <-ctx.Done():
log.Println("I am done!, probably cancelled!")
}
}()
}
也试过用这个
go func() {
scanner := bufio.NewScanner(&stdoutBuf)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
即使这样,“我正在读一行”也没有出来,我也调试了它,它也没有进入“for scanner..”
也尝试在 &stderrBuf
上扫描,同样,没有任何内容。
您必须扫描 stdoutBuf
而不是 os.Stdin
:
scanner := bufio.NewScanner(&stdoutBuf)
cmd.Start()
不等待命令完成。此外,需要调用 cmd.Wait()
以通知进程结束。
reader, writer := io.Pipe()
cmdCtx, cmdDone := context.WithCancel(context.Background())
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Start()
go func() {
_ = cmd.Wait()
cmdDone()
writer.Close()
}()
<-cmdCtx.Done()
<-scannerStopped
添加 scannerStopped
以演示扫描器 goroutine 现在停止。
reader, writer := io.Pipe()
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Run()
go func() {
_ = cmd.Wait()
writer.Close()
}()
<-scannerStopped
并根据需要处理这些行。
注:写的有点匆忙。如果有任何不清楚或不正确的地方,请告诉我。
上下文取消时命令终止。如果可以在命令终止之前读取命令的所有输出,则使用此代码:
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
err = cmd.Start()
if err != nil {
log.Fatal(err)
}
go func() {
defer cmd.Wait()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s := scanner.Text()
fmt.Println(s) // echo to stdout
// Do something with s
}
}()
}
取消上下文时命令终止。
命令终止时继续阅读 stdout
returns io.EOF。当 stdout
returns 一个错误时,goroutine 跳出扫描循环。
对于使用并发和 goroutines 的正确程序,我们应该尽量证明没有数据竞争,程序不会死锁,goroutines 不会泄漏。让我们努力实现这一目标。
完整代码
游乐场: https://play.golang.org/p/Xv1hJXYQoZq。我建议在本地复制和 运行,因为 playground 不会流式输出 afaik 并且它有超时。
请注意,我已将测试命令更改为 % find /usr/local
,这是一个典型的长 运行 命令(>3 秒),具有大量输出行,因为它更适合场景我们应该测试一下。
演练
让我们看看Daemon.Start
方法。一开始,它几乎是一样的。不过,最值得注意的是,新代码没有围绕该方法的大部分的 goroutine。即使没有这个,Daemon.Start
方法仍然是非阻塞的并且将 return “立即”。
第一个值得注意的修复是这些更新的行。
outR, outW := io.Pipe()
cmd.Stdout = io.MultiWriter(outW, os.Stdout)
我们调用 io.Pipe
而不是构建 bytes.Buffer 变量。如果我们没有进行此更改并坚持 bytes.Buffer,那么一旦没有更多数据可读,scanner.Scan()
就会 return false。如果命令只是偶尔写入 stdout(就此而言,即使相隔一毫秒),也会发生这种情况。在 scanner.Scan()
return 为 false 后,goroutine 退出,我们错过了处理未来输出的机会。
相反,通过使用 io.Pipe
的读取端,scanner.Scan()
将等待来自管道读取端的输入,直到管道的写入端关闭。
这解决了扫描器和命令输出之间的竞争问题。
接下来,我们构造两个密切相关的 goroutines:第一个从 <-lines
消费,第二个从 lines<-
.
生产
go func() {
for line := range lines {
fmt.Println("output line from channel:", line)
...
}
}()
go func() {
defer close(lines)
scanner := bufio.NewScanner(outR)
for scanner.Scan() {
lines <- scanner.Text()
}
...
}()
消费者goroutine会在lines
通道关闭时退出,因为通道的关闭自然会导致运行ge循环终止;生产者 goroutine 在退出时关闭 lines
。
当scanner.Scan()
returns false 时生产者goroutine 将退出,这发生在io.Pipe
的写端关闭时。此关闭发生在即将发布的代码中。
注意上面两段中的两个 goroutines gua运行teed 退出(即不会泄漏)。
接下来,我们启动命令。标准的东西,它是一个非阻塞调用,它会立即 return。
// Start the command.
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
继续 Daemon.Start
中的最后一段代码。这个 goroutine 等待命令通过 cmd.Wait()
退出。处理这个很重要,因为命令可能出于上下文取消以外的原因。
特别是,我们要关闭 io.Pipe
的写入端(如前所述,这反过来会关闭输出行生产者 goroutine)。
go func() {
err := cmd.Wait()
fmt.Println("command exited; error is:", err)
outW.Close()
...
}()
附带说明一下,通过等待 cmd.Wait()
,我们不必单独等待 ctx.Done()
。等待 cmd.Wait()
处理由自然原因引起的退出(命令成功完成,命令 运行 进入内部错误等)和由上下文取消引起的退出。
这个 goroutine 也可以运行退出。它会在 cmd.Wait()
return 秒时退出。这可能是因为命令成功正常退出;由于命令错误而失败退出;或由于上下文取消而失败退出。
就是这样!我们应该没有数据竞争,没有死锁,也没有泄漏的 goroutines。
上面代码片段中省略的行(“...
”)适用于守护程序类型的 Done()
、CmdErr()
和 Cancel()
方法。这些方法在代码中有很好的记录,因此这些省略的行很有希望是不言自明的。
除此之外,根据您的需要寻找 TODO
错误处理注释!
测试一下!
使用这个驱动程序来测试代码。
func main() {
var d Daemon
d.Start()
// Enable this code to test Context cancellation:
// time.AfterFunc(100*time.Millisecond, d.Cancel)
<-d.Done()
fmt.Println("d.CmdErr():", d.CmdErr())
}
有人能帮忙吗?
我有一个应用程序我是 运行 通过 exec.CommandContext (所以我可以通过 ctx 取消它)。除非出错,否则它通常不会停止。
我目前将它的输出中继到 os.stdOut,效果很好。但我也想通过一个通道获取每一行——这背后的想法是我将在该行中寻找一个正则表达式,如果它是真的那么我将设置一个内部状态“错误”。
虽然我无法让它工作,但我尝试了 NewSscanner。这是我的代码。
正如我所说,它确实输出到 os.StdOut,这很好,但我希望收到我设置的频道中发生的每一行。
有什么想法吗?
提前致谢。
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go func() {
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)
lines := make(chan string)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
select {
case outputx := <-lines:
// I will do somethign with this!
fmt.Println("Hello!!", outputx)
case <-ctx.Done():
log.Println("I am done!, probably cancelled!")
}
}()
}
也试过用这个
go func() {
scanner := bufio.NewScanner(&stdoutBuf)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
即使这样,“我正在读一行”也没有出来,我也调试了它,它也没有进入“for scanner..”
也尝试在 &stderrBuf
上扫描,同样,没有任何内容。
您必须扫描 stdoutBuf
而不是 os.Stdin
:
scanner := bufio.NewScanner(&stdoutBuf)
cmd.Start()
不等待命令完成。此外,需要调用 cmd.Wait()
以通知进程结束。
reader, writer := io.Pipe()
cmdCtx, cmdDone := context.WithCancel(context.Background())
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Start()
go func() {
_ = cmd.Wait()
cmdDone()
writer.Close()
}()
<-cmdCtx.Done()
<-scannerStopped
添加 scannerStopped
以演示扫描器 goroutine 现在停止。
reader, writer := io.Pipe()
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Run()
go func() {
_ = cmd.Wait()
writer.Close()
}()
<-scannerStopped
并根据需要处理这些行。
注:写的有点匆忙。如果有任何不清楚或不正确的地方,请告诉我。
上下文取消时命令终止。如果可以在命令终止之前读取命令的所有输出,则使用此代码:
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
err = cmd.Start()
if err != nil {
log.Fatal(err)
}
go func() {
defer cmd.Wait()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s := scanner.Text()
fmt.Println(s) // echo to stdout
// Do something with s
}
}()
}
取消上下文时命令终止。
命令终止时继续阅读 stdout
returns io.EOF。当 stdout
returns 一个错误时,goroutine 跳出扫描循环。
对于使用并发和 goroutines 的正确程序,我们应该尽量证明没有数据竞争,程序不会死锁,goroutines 不会泄漏。让我们努力实现这一目标。
完整代码
游乐场: https://play.golang.org/p/Xv1hJXYQoZq。我建议在本地复制和 运行,因为 playground 不会流式输出 afaik 并且它有超时。
请注意,我已将测试命令更改为 % find /usr/local
,这是一个典型的长 运行 命令(>3 秒),具有大量输出行,因为它更适合场景我们应该测试一下。
演练
让我们看看Daemon.Start
方法。一开始,它几乎是一样的。不过,最值得注意的是,新代码没有围绕该方法的大部分的 goroutine。即使没有这个,Daemon.Start
方法仍然是非阻塞的并且将 return “立即”。
第一个值得注意的修复是这些更新的行。
outR, outW := io.Pipe()
cmd.Stdout = io.MultiWriter(outW, os.Stdout)
我们调用 io.Pipe
而不是构建 bytes.Buffer 变量。如果我们没有进行此更改并坚持 bytes.Buffer,那么一旦没有更多数据可读,scanner.Scan()
就会 return false。如果命令只是偶尔写入 stdout(就此而言,即使相隔一毫秒),也会发生这种情况。在 scanner.Scan()
return 为 false 后,goroutine 退出,我们错过了处理未来输出的机会。
相反,通过使用 io.Pipe
的读取端,scanner.Scan()
将等待来自管道读取端的输入,直到管道的写入端关闭。
这解决了扫描器和命令输出之间的竞争问题。
接下来,我们构造两个密切相关的 goroutines:第一个从 <-lines
消费,第二个从 lines<-
.
go func() {
for line := range lines {
fmt.Println("output line from channel:", line)
...
}
}()
go func() {
defer close(lines)
scanner := bufio.NewScanner(outR)
for scanner.Scan() {
lines <- scanner.Text()
}
...
}()
消费者goroutine会在lines
通道关闭时退出,因为通道的关闭自然会导致运行ge循环终止;生产者 goroutine 在退出时关闭 lines
。
当scanner.Scan()
returns false 时生产者goroutine 将退出,这发生在io.Pipe
的写端关闭时。此关闭发生在即将发布的代码中。
注意上面两段中的两个 goroutines gua运行teed 退出(即不会泄漏)。
接下来,我们启动命令。标准的东西,它是一个非阻塞调用,它会立即 return。
// Start the command.
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
继续 Daemon.Start
中的最后一段代码。这个 goroutine 等待命令通过 cmd.Wait()
退出。处理这个很重要,因为命令可能出于上下文取消以外的原因。
特别是,我们要关闭 io.Pipe
的写入端(如前所述,这反过来会关闭输出行生产者 goroutine)。
go func() {
err := cmd.Wait()
fmt.Println("command exited; error is:", err)
outW.Close()
...
}()
附带说明一下,通过等待 cmd.Wait()
,我们不必单独等待 ctx.Done()
。等待 cmd.Wait()
处理由自然原因引起的退出(命令成功完成,命令 运行 进入内部错误等)和由上下文取消引起的退出。
这个 goroutine 也可以运行退出。它会在 cmd.Wait()
return 秒时退出。这可能是因为命令成功正常退出;由于命令错误而失败退出;或由于上下文取消而失败退出。
就是这样!我们应该没有数据竞争,没有死锁,也没有泄漏的 goroutines。
上面代码片段中省略的行(“...
”)适用于守护程序类型的 Done()
、CmdErr()
和 Cancel()
方法。这些方法在代码中有很好的记录,因此这些省略的行很有希望是不言自明的。
除此之外,根据您的需要寻找 TODO
错误处理注释!
测试一下!
使用这个驱动程序来测试代码。
func main() {
var d Daemon
d.Start()
// Enable this code to test Context cancellation:
// time.AfterFunc(100*time.Millisecond, d.Cancel)
<-d.Done()
fmt.Println("d.CmdErr():", d.CmdErr())
}