Golang - 到多个节点的并发 SSH 连接
Golang - concurrent SSH connections to multiple nodes
我有一组服务器正在尝试建立 SSH 连接,并且我正在为我必须建立的每个新 SSH 连接生成一个新的 goroutine。然后我将该连接的结果(连同错误(如果有的话))发送到一个通道,然后从该通道读取。这个程序有点用,但即使我关闭了频道,它最终还是卡住了。
这是我目前拥有的:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
wg.Add(1)
go func(hostname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", host+":22")
if err != nil {
cr <- ConnectionResult{host, "failed"}
} else {
cr <- ConnectionResult{host, "succeeded"}
}
}(host, cnres)
}
}
}
}
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
close(cnres)
defer wg.Wait()
}
我做错了什么?有没有更好的方法在 Go 中进行并发 SSH 连接?
上面的代码卡在了 range cnres
for
循环中。正如优秀的 'Go by Example' 中所指出的,range
只会在关闭的频道上退出。
解决这个困难的一种方法是 运行 在另一个 goroutine 中进行 range cnres
迭代。然后您可以 wg.Wait()
,然后 close()
频道,例如:
...
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
close(cnres)
切线说明(独立于被卡住的代码),我认为意图是在 Dial()
函数和后续通道写入中使用 hostname
,而不是 host
.
感谢 Frederik,我能够 运行 成功地得到这个:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
publicdnsname := *inst.PublicDNSName
wg.Add(1)
go func(ec2name, cbname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", ec2name+":22")
if err != nil {
cr <- ConnectionResult{cbname, "failed"}
} else {
cr <- ConnectionResult{cbname, "succeeded"}
}
}(publicdnsname, host, cnres)
}
}
}
}
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
}
Frederik 的解决方案工作正常,但有一些例外。如果命令组例程(从写入通道的循环)执行响应时间稍长的命令,处理例程(Frederik 的提示)将在最后一个命令例程完成之前处理并关闭通道,因此可能会发生一些数据丢失。
在我的例子中,我使用它对多个服务器执行远程 SSH 命令并打印响应。我的工作解决方案是使用 2 个单独的 WaitGroups,一个用于命令组例程,第二个用于处理例程。这样,处理例程将等待所有命令例程完成,然后处理响应并关闭通道以退出循环:
// Create waitgroup, channel and execute command with concurrency (goroutine)
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
wg_command.Add(1)
target := t + " (" + user + "@" + nodes[t] + ")"
go func(dst, user, ip, command string, out chan CommandResult) {
defer wg_command.Done()
result := remoteExec(user, ip, cmdCommand)
out <- CommandResult{dst, result}
}(target, user, nodes[t], cmdCommand, outchan)
}
wg_processing.Add(1)
go func() {
defer wg_processing.Done()
for o := range outchan {
bBlue.Println(o.target, "=>", cmdCommand)
fmt.Println(o.cmdout)
}
}()
// wait untill all goroutines to finish and close the channel
wg_command.Wait()
close(outchan)
wg_processing.Wait()
我有一组服务器正在尝试建立 SSH 连接,并且我正在为我必须建立的每个新 SSH 连接生成一个新的 goroutine。然后我将该连接的结果(连同错误(如果有的话))发送到一个通道,然后从该通道读取。这个程序有点用,但即使我关闭了频道,它最终还是卡住了。
这是我目前拥有的:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
wg.Add(1)
go func(hostname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", host+":22")
if err != nil {
cr <- ConnectionResult{host, "failed"}
} else {
cr <- ConnectionResult{host, "succeeded"}
}
}(host, cnres)
}
}
}
}
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
close(cnres)
defer wg.Wait()
}
我做错了什么?有没有更好的方法在 Go 中进行并发 SSH 连接?
上面的代码卡在了 range cnres
for
循环中。正如优秀的 'Go by Example' 中所指出的,range
只会在关闭的频道上退出。
解决这个困难的一种方法是 运行 在另一个 goroutine 中进行 range cnres
迭代。然后您可以 wg.Wait()
,然后 close()
频道,例如:
...
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
close(cnres)
切线说明(独立于被卡住的代码),我认为意图是在 Dial()
函数和后续通道写入中使用 hostname
,而不是 host
.
感谢 Frederik,我能够 运行 成功地得到这个:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
publicdnsname := *inst.PublicDNSName
wg.Add(1)
go func(ec2name, cbname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", ec2name+":22")
if err != nil {
cr <- ConnectionResult{cbname, "failed"}
} else {
cr <- ConnectionResult{cbname, "succeeded"}
}
}(publicdnsname, host, cnres)
}
}
}
}
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
}
Frederik 的解决方案工作正常,但有一些例外。如果命令组例程(从写入通道的循环)执行响应时间稍长的命令,处理例程(Frederik 的提示)将在最后一个命令例程完成之前处理并关闭通道,因此可能会发生一些数据丢失。
在我的例子中,我使用它对多个服务器执行远程 SSH 命令并打印响应。我的工作解决方案是使用 2 个单独的 WaitGroups,一个用于命令组例程,第二个用于处理例程。这样,处理例程将等待所有命令例程完成,然后处理响应并关闭通道以退出循环:
// Create waitgroup, channel and execute command with concurrency (goroutine)
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
wg_command.Add(1)
target := t + " (" + user + "@" + nodes[t] + ")"
go func(dst, user, ip, command string, out chan CommandResult) {
defer wg_command.Done()
result := remoteExec(user, ip, cmdCommand)
out <- CommandResult{dst, result}
}(target, user, nodes[t], cmdCommand, outchan)
}
wg_processing.Add(1)
go func() {
defer wg_processing.Done()
for o := range outchan {
bBlue.Println(o.target, "=>", cmdCommand)
fmt.Println(o.cmdout)
}
}()
// wait untill all goroutines to finish and close the channel
wg_command.Wait()
close(outchan)
wg_processing.Wait()