如何停止由进程启动的外部 I/O 阻塞的 goroutine?

How to stop goroutine blocked by external I/O started for process?

我遇到一个问题,我无法安全退出 goroutine。

我有一个使用 exec.Command 创建的外部进程(存储进程的 cmd、stdin 管道和 stdout 管道):

exec.Command(args[0], args[1]...) // args[0] is a base command

每当需要启动该过程时,我都会致电:

cmd.Start()

然后在开始和附加时我是 运行 2 个协程:

shutdown := make(chan struct{})
// Run the routine which will read from process and send the data to CmdIn channel
go pr.cmdInRoutine()
// Run the routine which will read from CmdOut and write to process
go pr.cmdOutRoutine()

cmdInRoutine:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    for {
        println("CMDINROUTINE")
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        default:
            println("Inside the for loop of the CmdInRoutine")
            if pr.stdOutPipe == nil {
                println("!!! Standard output pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            buf := make([]byte, 2048)

            size, err := pr.stdOutPipe.Read(buf)
            if err != nil {
                println("!!! Sending exit request from cmdInRoutine !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            println("--- Received data for sending to CmdIn:", string(buf[:size]))
            pr.CmdIn <- buf[:size]
        }

    }
}

cmdOutRoutine:

func (pr *ExternalProcess) cmdOutRoutine() {
    app.At(te, "cmdOutRoutine")

    for {
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdOutRoutine down !!!")
            return
        case data := <-pr.CmdOut:
            println("Received data for sending to Process: ", data)
            if pr.stdInPipe == nil {
                println("!!! Standard input pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                return
            }

            println("--- Received input to write to external process:", string(data))
            _, err := pr.stdInPipe.Write(append(data, '\n'))
            if err != nil {
                println("!!! Couldn't Write To the std in pipe of the process !!!")
                pr.ProcessExit <- true
                return
            }
        }
    }
}

此处有趣的案例:

1) 当进程发送 EOF 时(不要介意 pr.ProcessExit <- true 我正在使用通道通知父处理程序停止并退出process) in cmdInRoutine 我也在关闭关闭通道,这让 cmdOutRoutine 退出,因为在 select 语句中没有默认值情况下,它会阻塞并等待退出或数据,然后使用存储的 stdInPipe.

将数据写入 运行 进程

2) 当我只想停止 goroutines 但离开进程 运行 时,即暂停读写,我正在关闭关闭通道,希望这 2 个 goroutines 能够结束。
- cmdOutRoutine 打印 !!!关闭 cmdOutRoutine !!! 因为 select 没有默认情况并且关闭关闭通道会导致几乎立即返回
- cmdOutRoutine 不打印任何东西,我有一种奇怪的感觉,它甚至没有返回,我想是因为 它在默认情况下被阻止从 stdInPipe.

读取

我在想 运行 在 for 循环之前 cmdOutRoutine 中的另一个 goroutine 并且有进程的 stdIn 数据转换成通道然后我将能够消除 cmdInRoutine 中的默认情况,但这会产生另一个问题,新的 goroutine 也必须停止它仍然会被 [= =40=]stdIn 运行 进程。

有什么想法可以解决这个问题(修改逻辑)以满足随时关闭和启动 goroutines(进程 I/O)而不是 运行 进程本身的需要?或者有没有办法完全避免阻塞读写调用,我还不知道?

非常感谢。

它可能在 pr.stdOutPipe.Read(buf) 被屏蔽了。您可以尝试关闭 pr.stdOutPipe,这应该会中断读取。

您也可以关闭 pr.stdInPipe,以确保写入不会阻塞。

编辑:这将不允许您重新附加,但没有其他方法可以中断读取。最好在整个过程中只保留这两个 goroutines 运行,并在堆栈中的其他地方暂停(例如,如果您不想在暂停状态下接收命令的输出,请不要写 bufpr.CmdIn - 但要小心避免竞争条件)。

关闭在当前版本的 go 中可能有错误:issue 6817

编辑结束

此外,请注意 pr.CmdIn。如果关闭 stdOutPipe 不会导致 Read to return 出错,cmdInRoutine 将尝试写入通道。如果没有任何内容从中读取,cmdInRoutine 将永远阻塞。我会将 pr.stdOutPipe.Read(buf) 移出 select,然后将 pr.CmdIn <- buf[:size] 作为另一种情况添加到 select:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    // this check should probably only happen once.
    // if stdOutPipe can change during the loop,
    // then that's a race condition.
    if pr.stdOutPipe == nil {
        println("!!! Standard output pipe is nil. Sending Exit Request !!!")
        pr.ProcessExit <- true
        close(pr.shutdown)
        return
    }

    for {
        println("CMDINROUTINE")
        // we need to allocate a new buffer in each iteration,
        // because when we pass it through the channel,
        // we can no longer safely overwrite the data in it,
        // since the other goroutine might still be using it.
        buf := make([]byte, 2048)
        size, err := pr.stdOutPipe.Read(buf)
        if err != nil {
            println("!!! Sending exit request from cmdInRoutine !!!")
            // Be careful with this, if you also closed pr.shutdown when you closed stdOutPipe, then this is going to panic (closing a closed channel).
            pr.ProcessExit <- true
            close(pr.shutdown)
            return
        }

        // now that we have some data, try to send it,
        // unless we're done.
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        case pr.CmdIn <- buf[:size]:
            println("--- Received data for sending to CmdIn:", string(buf[:size]))
        }
    }
}