使用 golang 客户端从远程 kubernetes 命令流式输出
Stream output from remote kubernetes command with golang client
我正在使用 golang kubernetes 客户端创建 kubernetes pods 并在其中执行远程命令。但是,我发现在远程执行完成之前我无法获得有关远程执行状态的反馈,因为我不知道如何流式传输远程命令的日志。这是我当前执行远程命令的实现:
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var stdout, stderr bytes.Buffer
var streamErr error
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return stderr.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return stdout.String(), 0, nil
}
在此实现中,我不知道远程命令的状态,直到它完成执行,此时我立即获得所有输出日志。
有没有办法在调用 exec.Stream
写入时读取 stdout
/stderr
?在理想情况下,我希望能够逐行打印远程命令行的输出。我注意到 bytes.Buffer
有一个 ReadString
方法接受分隔符。这看起来像是一个有用的方法,但我一直无法弄清楚如何使用它。
这只是部分答案,但如果我设置使用以下 PodExecOptions
和 StreamOptions
然后我看到每个日志行都被实时打印(请注意 Tty
是 true
并且我使用的是标准输入和标准输出,而不是自定义缓冲区):
v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}
和
remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: nil,
Tty: true,
}
但是,如果我尝试使用 os.Stdin
和 os.Stdout
以外的东西,那么我将永远不会得到任何日志行。例如,以下用法不会打印任何内容:
var stdout, stdin bytes.Buffer
var streamErr error
go func() {
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
Stderr: nil,
Tty: true,
})
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("finished scanning of stdout")
我仍在尝试弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接通过管道传输到标准输出(我想将一些自定义字段附加到记录的每一行)。
编辑:好的,我想出了一个可行的解决方案。这是完整的代码
type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: l,
Stderr: nil,
Tty: true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}
我创建了一个实现 io.Writer
接口的结构,并在 StreamOptions
结构中使用它。另请注意,我 不得不 在 StreamOptions
结构中使用 os.Stdin
否则只有一行会流回 Stdout
.
另请注意,我必须 trim 将缓冲区传递给 LogStreamer.Write
,因为回车 returns 或换行符似乎会导致 logrus 包出现问题。这个解决方案还有更多改进之处,但它肯定朝着正确的方向前进。
我正在使用 golang kubernetes 客户端创建 kubernetes pods 并在其中执行远程命令。但是,我发现在远程执行完成之前我无法获得有关远程执行状态的反馈,因为我不知道如何流式传输远程命令的日志。这是我当前执行远程命令的实现:
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var stdout, stderr bytes.Buffer
var streamErr error
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return stderr.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return stdout.String(), 0, nil
}
在此实现中,我不知道远程命令的状态,直到它完成执行,此时我立即获得所有输出日志。
有没有办法在调用 exec.Stream
写入时读取 stdout
/stderr
?在理想情况下,我希望能够逐行打印远程命令行的输出。我注意到 bytes.Buffer
有一个 ReadString
方法接受分隔符。这看起来像是一个有用的方法,但我一直无法弄清楚如何使用它。
这只是部分答案,但如果我设置使用以下 PodExecOptions
和 StreamOptions
然后我看到每个日志行都被实时打印(请注意 Tty
是 true
并且我使用的是标准输入和标准输出,而不是自定义缓冲区):
v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}
和
remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: nil,
Tty: true,
}
但是,如果我尝试使用 os.Stdin
和 os.Stdout
以外的东西,那么我将永远不会得到任何日志行。例如,以下用法不会打印任何内容:
var stdout, stdin bytes.Buffer
var streamErr error
go func() {
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
Stderr: nil,
Tty: true,
})
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("finished scanning of stdout")
我仍在尝试弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接通过管道传输到标准输出(我想将一些自定义字段附加到记录的每一行)。
编辑:好的,我想出了一个可行的解决方案。这是完整的代码
type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: l,
Stderr: nil,
Tty: true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}
我创建了一个实现 io.Writer
接口的结构,并在 StreamOptions
结构中使用它。另请注意,我 不得不 在 StreamOptions
结构中使用 os.Stdin
否则只有一行会流回 Stdout
.
另请注意,我必须 trim 将缓冲区传递给 LogStreamer.Write
,因为回车 returns 或换行符似乎会导致 logrus 包出现问题。这个解决方案还有更多改进之处,但它肯定朝着正确的方向前进。