Golang - 到多个节点的并发 SSH 连接

Golang - concurrent SSH connections to multiple nodes

我有一组服务器正在尝试建立 SSH 连接,并且我正在为我必须建立的每个新 SSH 连接生成一个新的 goroutine。然后我将该连接的结果(连同错误(如果有的话))发送到一个通道,然后从该通道读取。这个程序有点用,但即使我关闭了频道,它最终还是卡住了。

这是我目前拥有的:

package main

import (
    "fmt"
    "net"
    "sync"

    "github.com/awslabs/aws-sdk-go/aws"
    "github.com/awslabs/aws-sdk-go/service/ec2"
)

// ConnectionResult container
type ConnectionResult struct {
    host    string
    message string
}

func main() {
    cnres := make(chan ConnectionResult)
    ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
    wg := sync.WaitGroup{}

    params := &ec2.DescribeInstancesInput{
        Filters: []*ec2.Filter{
            &ec2.Filter{
                Name: aws.String("instance-state-name"),
                Values: []*string{
                    aws.String("running"),
                },
            },
        },
    }

    resp, err := ec2svc.DescribeInstances(params)
    if err != nil {
        panic(err)
    }

    for _, res := range resp.Reservations {
        for _, inst := range res.Instances {
            for _, tag := range inst.Tags {
                if *tag.Key == "Name" {
                    host := *tag.Value
                    wg.Add(1)
                    go func(hostname string, cr chan ConnectionResult) {
                        defer wg.Done()
                        _, err := net.Dial("tcp", host+":22")
                        if err != nil {
                            cr <- ConnectionResult{host, "failed"}
                        } else {
                            cr <- ConnectionResult{host, "succeeded"}
                        }
                    }(host, cnres)
                }
            }
        }
    }

    for cr := range cnres {
        fmt.Println("Connection to " + cr.host + " " + cr.message)
    }

    close(cnres)

    defer wg.Wait()
}

我做错了什么?有没有更好的方法在 Go 中进行并发 SSH 连接?

上面的代码卡在了 range cnres for 循环中。正如优秀的 'Go by Example' 中所指出的,range 只会在关闭的频道上退出。

解决这个困难的一种方法是 运行 在另一个 goroutine 中进行 range cnres 迭代。然后您可以 wg.Wait(),然后 close() 频道,例如:

...
go func() {
        for cr := range cnres {
                fmt.Println("Connection to " + cr.host + " " + cr.message)
        }   
}() 
wg.Wait()
close(cnres)

切线说明(独立于被卡住的代码),我认为意图是在 Dial() 函数和后续通道写入中使用 hostname,而不是 host.

感谢 Frederik,我能够 运行 成功地得到这个:

package main

import (
    "fmt"
    "net"
    "sync"

    "github.com/awslabs/aws-sdk-go/aws"
    "github.com/awslabs/aws-sdk-go/service/ec2"
)

// ConnectionResult container
type ConnectionResult struct {
    host    string
    message string
}

func main() {
    cnres := make(chan ConnectionResult)
    ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
    wg := sync.WaitGroup{}

    params := &ec2.DescribeInstancesInput{
        Filters: []*ec2.Filter{
            &ec2.Filter{
                Name: aws.String("instance-state-name"),
                Values: []*string{
                    aws.String("running"),
                },
            },
        },
    }

    resp, err := ec2svc.DescribeInstances(params)
    if err != nil {
        panic(err)
    }

    for _, res := range resp.Reservations {
        for _, inst := range res.Instances {
            for _, tag := range inst.Tags {
                if *tag.Key == "Name" {
                    host := *tag.Value
                    publicdnsname := *inst.PublicDNSName
                    wg.Add(1)
                    go func(ec2name, cbname string, cr chan ConnectionResult) {
                        defer wg.Done()
                        _, err := net.Dial("tcp", ec2name+":22")
                        if err != nil {
                            cr <- ConnectionResult{cbname, "failed"}
                        } else {
                            cr <- ConnectionResult{cbname, "succeeded"}
                        }
                    }(publicdnsname, host, cnres)
                }
            }
        }
    }

    go func() {
        for cr := range cnres {
            fmt.Println("Connection to " + cr.host + " " + cr.message)
        }
    }()

    wg.Wait()
}

Frederik 的解决方案工作正常,但有一些例外。如果命令组例程(从写入通道的循环)执行响应时间稍长的命令,处理例程(Frederik 的提示)将在最后一个命令例程完成之前处理并关闭通道,因此可能会发生一些数据丢失。

在我的例子中,我使用它对多个服务器执行远程 SSH 命令并打印响应。我的工作解决方案是使用 2 个单独的 WaitGroups,一个用于命令组例程,第二个用于处理例程。这样,处理例程将等待所有命令例程完成,然后处理响应并关闭通道以退出循环:

// Create waitgroup, channel and execute command with concurrency (goroutine)
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
    wg_command.Add(1)
    target := t + " (" + user + "@" + nodes[t] + ")"
    go func(dst, user, ip, command string, out chan CommandResult) {
        defer wg_command.Done()
        result := remoteExec(user, ip, cmdCommand)
        out <- CommandResult{dst, result}
    }(target, user, nodes[t], cmdCommand, outchan)
}

wg_processing.Add(1)
go func() {
    defer wg_processing.Done()
    for o := range outchan {
        bBlue.Println(o.target, "=>", cmdCommand)
        fmt.Println(o.cmdout)
    }
}()

// wait untill all goroutines to finish and close the channel
wg_command.Wait()
close(outchan)
wg_processing.Wait()