我们可以在 golang 中拥有多个具有多个 go routines 的通道吗

Can we have multiple channels having multiple go routines in golang

  1. 能否让多个go routines监听多个channels,在打印所有问题时都​​面临issue。

  2. 我无法打印所有数字我该如何改进这段代码

  3. 如果可能的话,任何人都可以提供一些例子,因为我正在努力解决这个例子。

  4. 每个围棋例程后是否需要time.sleep


    package main
    
    import (
        "fmt"
        "strconv"
        "sync"
        "time"
    )
    
    var count string
    
    func worker3(var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var3 {
            count += ch + " "
        }
    }
    
    func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var2 {
            var3 <- ch
        }
    }
    
    func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var1 {
            var2 <- ch
        }
    }
    
    func main() {
        var1 := make(chan string, 1500)
        var2 := make(chan string, 1500)
        var3 := make(chan string, 1500)
    
        var wg sync.WaitGroup
        count = ""
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker1(var1, var2, var3, &wg)
        }
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker2(var2, var3, &wg)
        }
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker3(var3, &wg)
        }
    
        for i := 0; i <= 100000; i++ {
            var1 <- strconv.Itoa(i)
        }
        time.Sleep(time.Second)
        wg.Wait()
        fmt.Println(count)
    }

是的,这很复杂,但有一些经验法则应该会让事情变得更简单。

  • 更喜欢对传递给 go-routines 的通道使用形式参数,而不是在全局范围内访问通道。您可以通过这种方式获得更多的编译器检查,以及更好的模块化。
  • 避免在特定的 go-routine(包括 'main' 例程)中在同一通道上读取和写入。否则,死锁的风险要大得多。

让我们看看您的程序在做什么。 您首先初始化了三个缓冲通道 变量 1、变量 2、变量 3

var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)

现在你初始化了一个 WaitGroup (wg)

var wg sync.WaitGroup

现在你定义了变量 count 并且该变量是空字符串

count = "" 

下一部分是从 0 到 15 并生成 15 个 worker1 go routines 的循环

for i := 0; i < 15; i++ {
     time.Sleep(time.Second)
     wg.Add(1)
     go worker1(var1, var2, var3, &wg)
}

每次启动一个 worker1 go routine 并将通道和指针传递给 worker1 中的 waitgroup (wg)。

但是 worker1 会做什么呢?

func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var1 {
            var2 <- ch
        }
    }

worker1 将等待通道 var1 从该通道获取数据并将其传递给通道 var2。

这很好。你绝对不需要这个 time.Sleep(time.Second).

我们下一步

您现在创建一个新循环,它将生成另外 15 个 go 例程 (worker2)。

for i := 0; i < 15; i++ {
    time.Sleep(time.Second)
    wg.Add(1)
    go worker2(var2, var3, &wg)
}

worker2 将从通道 var2 获取所有内容并将其传递给通道 var3

func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var2 {
            var3 <- ch
        }
    }

现在您为 worker3 创建另外 15 个 go 例程。

for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker3(var3, &wg)
        }

并且 worker3 从通道 var3 中获取所有内容,通过将其附加到计数字符串来处理该数据

最后一段代码是将数据播种到频道。该循环从 0 - 100000 开始,对于每个数字,将它们转换为字符串并将其传递给通道 var1

下一个程序将等待所有 go routine 完成并打印结果。

好的,这段代码有一些问题。

  1. 你绝对不需要这个 time.Sleep(time.Second) 在每个 go routine 之前你也不需要 time.Sleep 在 wg.Wait() 之前。
  2. 此类工作负载不需要缓冲通道。这是一个简单的管道,你可以使用无缓冲通道,通道将用于任务之间的同步。

当您更改代码以使用无缓冲通道并删除这些 time.Sleep 时,您仍然有问题。问题是 go lang runtime show 的错误:

fatal error: all goroutines are asleep - deadlock!

并终止代码。

但为什么会发生这种情况,我们有 sync.WaitGroup 并且一切看起来都很好。让我们看一个具有相同错误的更简单的程序。

package main

import (
    "log"
    "strconv"
    "sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for e := range var1 {
        log.Printf("Element e %s ", e)
    }

}
func main() {
    var1 := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(var1, &wg)
    }

    for i := 0; i < 15; i++ {
        var1 <- strconv.Itoa(i)
    }

    wg.Wait()
}

此代码也会产生与您的代码相同的错误。这是因为这些通道永远不会关闭,并且 go routines(worker)将永远等待通道中的新数据。 Go 运行时检测到并终止进程。

为了防止这种类型的错误,我们需要添加一些机制来告诉 gorutine 我们已经完成并且 go routine 可以停止监听该通道并正确完成。

发送该信号的最简单方法是关闭该 goroutine 读取的通道。这是解决问题的代码。

package main

import (
    "log"
    "strconv"
    "sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for e := range var1 {
        log.Printf("Element e %s ", e)
    }

}
func main() {
    var1 := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(var1, &wg)
    }

    for i := 0; i < 15; i++ {
        var1 <- strconv.Itoa(i)
    }
    close(var1)
    wg.Wait()
}

并且此代码不会产生错误。此代码将正确终止。

不过有个窍门。你如何在你的代码中做到这一点?有 15 个从 var1 通道读取的 go 例程,15 个从 var2 通道读取的 goroutine,15 个从 var3 通道读取。

很难知道什么时候可以关闭哪个频道。我们知道通道 var1 首先处理数据,因此我们可以在生产者完成同步通道中的插入数据时关闭它们。原因是在读取以前的数据之前,我们无法将新数据插入通道。因此,当 producer 插入所有数据时,我们知道通道 var1 上的所有数据都已处理,因此关闭通道是安全的。但是频道 var2var3.

有 15 个不同的 go 例程等待通道 var2,15 个等待 var3,这意味着我们需要找到一种方法在 var2 上的所有处理完成后关闭 var2(在所有 goroutines worker1 ), var3 也一样。这可以通过创建两个额外的 goroutine

wg1 和 wg2 并使用该 goroutine 为 worker1 和 worker2 生成 goroutine,这些 goroutine 将作为编排器工作,在这些函数内部我们只为 worker1 和 worker2 创建新的 sync.Group 这些函数将知道当所有这些子 goroutines 完成时。因此,对于 wg1,当所有这些 worker1 goroutine 完成后,我们可以安全地关闭 var2 通道。 wg2 和 var3 通道相同。

这些是 wg1 和 wg2 函数

// wg1
wg.Add(1)
go func() {
        log.Printf("Starting WG1 master go routine")
        var wg1 sync.WaitGroup
        defer func() {
            close(var2)
            wg.Done()
        }()
        for i := 0; i < 15; i++ {
            wg1.Add(1)
            go worker1(var1, var2, &wg1)
        }
        wg1.Wait()
    }()
// wg2
wg.Add(1)
go func() {
        log.Printf("Starting WG2 routine for second stage")
        defer func() {
            close(var3)
            wg.Done()
        }()
        var wg2 sync.WaitGroup
        for i := 0; i < 15; i++ {
            wg2.Add(1)
            go worker2(var2, var3, &wg2)
        }
        wg2.Wait()
    }()

您可以在以下位置找到完整的工作代码: Working example