使用 golang 客户端从远程 kubernetes 命令流式输出

Stream output from remote kubernetes command with golang client

我正在使用 golang kubernetes 客户端创建 kubernetes pods 并在其中执行远程命令。但是,我发现在远程执行完成之前我无法获得有关远程执行状态的反馈,因为我不知道如何流式传输远程命令的日志。这是我当前执行远程命令的实现:

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     false,
        Stdout:    true,
        Stderr:    true,
        TTY:       false,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var stdout, stderr bytes.Buffer
    var streamErr error
    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  nil,
        Stdout: &stdout,
        Stderr: &stderr,
        Tty:    false,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return stderr.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return stdout.String(), 0, nil
}

在此实现中,我不知道远程命令的状态,直到它完成执行,此时我立即获得所有输出日志。

有没有办法在调用 exec.Stream 写入时读取 stdout/stderr?在理想情况下,我希望能够逐行打印远程命令行的输出。我注意到 bytes.Buffer 有一个 ReadString 方法接受分隔符。这看起来像是一个有用的方法,但我一直无法弄清楚如何使用它。

这只是部分答案,但如果我设置使用以下 PodExecOptionsStreamOptions 然后我看到每个日志行都被实时打印(请注意 Ttytrue 并且我使用的是标准输入和标准输出,而不是自定义缓冲区):

v1.PodExecOptions{
        Stdin:     true,
        Stdout:    true,
        Stderr:    false,
        TTY:       true,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }

remotecommand.StreamOptions{
            Stdin:  os.Stdin,
            Stdout: os.Stdout,
            Stderr: nil,
            Tty:    true,
        }

但是,如果我尝试使用 os.Stdinos.Stdout 以外的东西,那么我将永远不会得到任何日志行。例如,以下用法不会打印任何内容:

    var stdout, stdin bytes.Buffer
    var streamErr error

    go func() {
        streamErr = exec.Stream(remotecommand.StreamOptions{
            Stdin:  &stdin,
            Stdout: &stdout,
            Stderr: nil,
            Tty:    true,
        })
    }()
    time.Sleep(5*time.Second)
    log.Info("doing raw string calls on both buffers")
    log.Info(stdin.String())
    log.Info(stdout.String())
    log.Info("starting scan of stdin")
    scanner := bufio.NewScanner(&stdin)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
    log.Info("starting scan of stdout")
    scanner = bufio.NewScanner(&stdout)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
    log.Info("finished scanning of stdout")

我仍在尝试弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接通过管道传输到标准输出(我想将一些自定义字段附加到记录的每一行)。


编辑:好的,我想出了一个可行的解决方案。这是完整的代码

type LogStreamer struct{
    b bytes.Buffer
}

func (l *LogStreamer) String() string {
    return l.b.String()
}

func (l *LogStreamer) Write(p []byte) (n int, err error) {
    a := strings.TrimSpace(string(p))
    l.b.WriteString(a)
    log.Info(a)
    return len(p), nil
}

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     true,
        Stdout:    true,
        Stderr:    false,
        TTY:       true,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var streamErr error
    l := &LogStreamer{}

    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  os.Stdin,
        Stdout: l,
        Stderr: nil,
        Tty:    true,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return l.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return l.String(), 0, nil
}

我创建了一个实现 io.Writer 接口的结构,并在 StreamOptions 结构中使用它。另请注意,我 不得不 StreamOptions 结构中使用 os.Stdin 否则只有一行会流回 Stdout.

另请注意,我必须 trim 将缓冲区传递给 LogStreamer.Write,因为回车 returns 或换行符似乎会导致 logrus 包出现问题。这个解决方案还有更多改进之处,但它肯定朝着正确的方向前进。