io.Pipe() 未按预期工作。我在这里做错了什么?

io.Pipe() not working as desired. What am I doing wrong here?

我一直在使用 client-go 测试 kubernetes pod 的执行功能。这是与 os.Stdin

完美配合的代码
{
    // Prepare the API URL used to execute another process within the Pod.  In
    // this case, we'll run a remote shell.
    req := coreclient.RESTClient().
        Post().
        Namespace(pod.Namespace).
        Resource("pods").
        Name(pod.Name).
        SubResource("exec").
        VersionedParams(&corev1.PodExecOptions{
            Container: pod.Spec.Containers[0].Name,
            Command:   []string{"/bin/sh"},
            Stdin:     true,
            Stdout:    true,
            Stderr:    true,
            TTY:       true,
        }, scheme.ParameterCodec)

    exec, err := remotecommand.NewSPDYExecutor(restconfig, "POST", req.URL())
    if err != nil {
        panic(err)
    }

    // Put the terminal into raw mode to prevent it echoing characters twice.
    oldState, err := terminal.MakeRaw(0)
    if err != nil {
        panic(err)
    }
    defer terminal.Restore(0, oldState)

    // Connect this process' std{in,out,err} to the remote shell process.
    err = exec.Stream(remotecommand.StreamOptions{
        Stdin:  os.Stdin,
        Stdout: os.Stdout,
        Stderr: os.Stderr,
        Tty:    true,
    })
    if err != nil {
        panic(err)
    }

    fmt.Println()
}

然后我开始用 io.Pipe() 进行测试,这样我就可以给它输入,而不是 os.Stdin,基本上来自变量或任何其他来源。修改后的代码可以在这里找到

{
    // Prepare the API URL used to execute another process within the Pod.  In
    // this case, we'll run a remote shell.
    req := coreclient.RESTClient().
        Post().
        Namespace(pod.Namespace).
        Resource("pods").
        Name(pod.Name).
        SubResource("exec").
        VersionedParams(&corev1.PodExecOptions{
            Container: pod.Spec.Containers[0].Name,
            Command:   []string{"/bin/sh"},
            Stdin:     true,
            Stdout:    true,
            Stderr:    true,
            TTY:       true,
        }, scheme.ParameterCodec)

    exec, err := remotecommand.NewSPDYExecutor(restconfig, "POST", req.URL())
    if err != nil {
        panic(err)
    }

    // Put the terminal into raw mode to prevent it echoing characters twice.
    oldState, err := terminal.MakeRaw(0)
    if err != nil {
        panic(err)
    }
    defer terminal.Restore(0, oldState)

    // Scanning for inputs from os.stdin
    stdin, putStdin := io.Pipe()
    go func() {
        consolescanner := bufio.NewScanner(os.Stdin)
        for consolescanner.Scan() {
            input := consolescanner.Text()
            fmt.Println("input:", input)
            putStdin.Write([]byte(input))
        }
        if err := consolescanner.Err(); err != nil {
            fmt.Println(err)
            os.Exit(1)
        }
    }()

    // Connect this process' std{in,out,err} to the remote shell process.
    err = exec.Stream(remotecommand.StreamOptions{
        Stdin:  stdin,
        Stdout: os.Stdout,
        Stderr: os.Stdout,
        Tty:    true,
    })
    if err != nil {
        panic(err)
    }

    fmt.Println()
}

奇怪的是,这似乎挂起了终端,有人可以指出我做错了什么吗?

我并没有试图理解你所有的代码,但是:当执行一个单独的进程时,你几乎总是想使用 os.Pipe,而不是 io.Pipe

os.Pipe是操作系统创建的管道。 io.Pipe 是一个完全存在于 Go 中的软件结构,它从 io.Writer 复制到 io.Reader。在执行单独的进程时使用io.Pipe通常会通过创建os.Pipe并启动goroutines在io.Pipeos.Pipe之间进行复制来实现。只需使用 os.Pipe.

更新: 感谢@bcmills 提醒我 os.Pipe 的缓冲区取决于系统。

让我们重新看一下 os.Pipe() return 值

reader, writer, err := os.Pipe()

为了解决这个问题,我们应该有一个 MAX_WRITE_SIZE 常量来表示写入 writer 的字节数组的长度。

MAX_WRITE_SIZE 的值也应该是系统相关的。例如,在 linux 中,缓冲区大小为 64k。所以我们可以将 MAX_WRITE_SIZE 配置为 < 64k 的值。

如果要发送的数据长度大于MAX_WRITE_SIZE,可以分块顺序发送。


终端挂起的原因是你使用io.Pipe().

时死锁

关于 io.Pipe() 方法的文档

Pipe creates a synchronous in-memory pipe.

The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.

所以在没有管道读调用阻塞的情况下写入管道会导致死锁(类似于在没有管道写调用阻塞的情况下从管道读取)

要解决这个问题,你应该使用os.Pipe,它类似于linux pipe命令。

因为数据会被缓冲,所以不需要read/write阻塞。

Data written to the write end of the pipe is buffered by the kernel until it is read from the read end of the pipe

我能够解决我的问题。不幸的是,上述方法中的 none 帮助了我,但我做了以下工作。 我为要输入的字符串创建了一个单独的 io.Reader,然后从上面的代码片段中从 reader 到 putStdin 做了一个 io.Copy。早些时候我使用了 putStdin.Write(<string>) 但没有成功。

我希望这能解决一些人的问题。