从 exec.Command 逐行捕获标准输出并通过管道传输到 os.Stdout

Capture stdout from exec.Command line by line and also pipe to os.Stdout

有人能帮忙吗?

我有一个应用程序我是 运行 通过 exec.CommandContext (所以我可以通过 ctx 取消它)。除非出错,否则它通常不会停止。

我目前将它的输出中继到 os.stdOut,效果很好。但我也想通过一个通道获取每一行——这背后的想法是我将在该行中寻找一个正则表达式,如果它是真的那么我将设置一个内部状态“错误”。

虽然我无法让它工作,但我尝试了 NewSscanner。这是我的代码。

正如我所说,它确实输出到 os.StdOut,这很好,但我希望收到我设置的频道中发生的每一行。

有什么想法吗?

提前致谢。

func (d *Daemon) Start() {
    ctx, cancel := context.WithCancel(context.Background())
    d.cancel = cancel

    go func() {
        args := "-x f -a 1"
        cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)

        var stdoutBuf, stderrBuf bytes.Buffer

        cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
        cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)

        lines := make(chan string)

        go func() {
            scanner := bufio.NewScanner(os.Stdin)
            for scanner.Scan() {
                fmt.Println("I am reading a line!")
                lines <- scanner.Text()
            }
        }()

        err := cmd.Start()
        if err != nil {
            log.Fatal(err)
        }

        select {
        case outputx := <-lines:
            // I will do somethign with this!
            fmt.Println("Hello!!", outputx)

        case <-ctx.Done():
            log.Println("I am done!, probably cancelled!")
        }
    }()
}

也试过用这个

        go func() {
            scanner := bufio.NewScanner(&stdoutBuf)
            for scanner.Scan() {
                fmt.Println("I am reading a line!")
                lines <- scanner.Text()
            }
        }()

即使这样,“我正在读一行”也没有出来,我也调试了它,它也没有进入“for scanner..”

也尝试在 &stderrBuf 上扫描,同样,没有任何内容。

您必须扫描 stdoutBuf 而不是 os.Stdin:

scanner := bufio.NewScanner(&stdoutBuf)

cmd.Start() 不等待命令完成。此外,需要调用 cmd.Wait() 以通知进程结束。

reader, writer := io.Pipe()

cmdCtx, cmdDone := context.WithCancel(context.Background())

scannerStopped := make(chan struct{})
go func() {
    defer close(scannerStopped)

    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}()

cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Start()
go func() {
    _ = cmd.Wait()
    cmdDone()
    writer.Close()
}()
<-cmdCtx.Done()

<-scannerStopped
添加

scannerStopped 以演示扫描器 goroutine 现在停止。

reader, writer := io.Pipe()

scannerStopped := make(chan struct{})
go func() {
    defer close(scannerStopped)

    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}()

cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Run()

go func() {
    _ = cmd.Wait()
    writer.Close()
}()

<-scannerStopped

并根据需要处理这些行。

注:写的有点匆忙。如果有任何不清楚或不正确的地方,请告诉我。

上下文取消时命令终止。如果可以在命令终止之前读取命令的所有输出,则使用此代码:

func (d *Daemon) Start() {
    ctx, cancel := context.WithCancel(context.Background())
    d.cancel = cancel

    args := "-x f -a 1"
    cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Fatal(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        defer cmd.Wait()
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            s := scanner.Text()
            fmt.Println(s) // echo to stdout
            // Do something with s
        }
    }()
}

取消上下文时命令终止。
命令终止时继续阅读 stdout returns io.EOF。当 stdout returns 一个错误时,goroutine 跳出扫描循环。

对于使用并发和 goroutines 的正确程序,我们应该尽量证明没有数据竞争,程序不会死锁,goroutines 不会泄漏。让我们努力实现这一目标。

完整代码

游乐场: https://play.golang.org/p/Xv1hJXYQoZq。我建议在本地复制和 运行,因为 playground 不会流式输出 afaik 并且它有超时。

请注意,我已将测试命令更改为 % find /usr/local,这是一个典型的长 运行 命令(>3 秒),具有大量输出行,因为它更适合场景我们应该测试一下。

演练

让我们看看Daemon.Start方法。一开始,它几乎是一样的。不过,最值得注意的是,新代码没有围绕该方法的大部分的 goroutine。即使没有这个,Daemon.Start 方法仍然是非阻塞的并且将 return “立即”。


第一个值得注意的修复是这些更新的行。

    outR, outW := io.Pipe()
    cmd.Stdout = io.MultiWriter(outW, os.Stdout)

我们调用 io.Pipe 而不是构建 bytes.Buffer 变量。如果我们没有进行此更改并坚持 bytes.Buffer,那么一旦没有更多数据可读,scanner.Scan() 就会 return false。如果命令只是偶尔写入 stdout(就此而言,即使相隔一毫秒),也会发生这种情况。在 scanner.Scan() return 为 false 后,goroutine 退出,我们错过了处理未来输出的机会。

相反,通过使用 io.Pipe 的读取端,scanner.Scan() 将等待来自管道读取端的输入,直到管道的写入端关闭。

这解决了扫描器和命令输出之间的竞争问题。


接下来,我们构造两个密切相关的 goroutines:第一个从 <-lines 消费,第二个从 lines<-.

生产
    go func() {
        for line := range lines {
            fmt.Println("output line from channel:", line)
            ...
        }
    }()
    go func() {
        defer close(lines)
        scanner := bufio.NewScanner(outR)
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        ...
    }()

消费者goroutine会在lines通道关闭时退出,因为通道的关闭自然会导致运行ge循环终止;生产者 goroutine 在退出时关闭 lines

scanner.Scan() returns false 时生产者goroutine 将退出,这发生在io.Pipe 的写端关闭时。此关闭发生在即将发布的代码中。

注意上面两段中的两个 goroutines gua运行teed 退出(即不会泄漏)。


接下来,我们启动命令。标准的东西,它是一个非阻塞调用,它会立即 return。

// Start the command.
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

继续 Daemon.Start 中的最后一段代码。这个 goroutine 等待命令通过 cmd.Wait() 退出。处理这个很重要,因为命令可能出于上下文取消以外的原因。

特别是,我们要关闭 io.Pipe 的写入端(如前所述,这反过来会关闭输出行生产者 goroutine)。

    go func() {
        err := cmd.Wait()
        fmt.Println("command exited; error is:", err)
        outW.Close()
        ...
    }()

附带说明一下,通过等待 cmd.Wait(),我们不必单独等待 ctx.Done()。等待 cmd.Wait() 处理由自然原因引起的退出(命令成功完成,命令 运行 进入内部错误等)和由上下文取消引起的退出。

这个 goroutine 也可以运行退出。它会在 cmd.Wait() return 秒时退出。这可能是因为命令成功正常退出;由于命令错误而失败退出;或由于上下文取消而失败退出。


就是这样!我们应该没有数据竞争,没有死锁,也没有泄漏的 goroutines。

上面代码片段中省略的行(“...”)适用于守护程序类型的 Done()CmdErr()Cancel() 方法。这些方法在代码中有很好的记录,因此这些省略的行很有希望是不言自明的。

除此之外,根据您的需要寻找 TODO 错误处理注释!

测试一下!

使用这个驱动程序来测试代码。

func main() {
    var d Daemon
    d.Start()

    // Enable this code to test Context cancellation:
    // time.AfterFunc(100*time.Millisecond, d.Cancel)

    <-d.Done()
    fmt.Println("d.CmdErr():", d.CmdErr())
}