我们可以在 golang 中拥有多个具有多个 go routines 的通道吗
Can we have multiple channels having multiple go routines in golang
能否让多个go routines监听多个channels,在打印所有问题时都面临issue。
我无法打印所有数字我该如何改进这段代码
如果可能的话,任何人都可以提供一些例子,因为我正在努力解决这个例子。
每个围棋例程后是否需要time.sleep
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var count string
func worker3(var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var3 {
count += ch + " "
}
}
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
func main() {
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
var wg sync.WaitGroup
count = ""
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
for i := 0; i <= 100000; i++ {
var1 <- strconv.Itoa(i)
}
time.Sleep(time.Second)
wg.Wait()
fmt.Println(count)
}
是的,这很复杂,但有一些经验法则应该会让事情变得更简单。
- 更喜欢对传递给 go-routines 的通道使用形式参数,而不是在全局范围内访问通道。您可以通过这种方式获得更多的编译器检查,以及更好的模块化。
- 避免在特定的 go-routine(包括 'main' 例程)中在同一通道上读取和写入。否则,死锁的风险要大得多。
让我们看看您的程序在做什么。
您首先初始化了三个缓冲通道
变量 1、变量 2、变量 3
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
现在你初始化了一个 WaitGroup (wg)
var wg sync.WaitGroup
现在你定义了变量 count 并且该变量是空字符串
count = ""
下一部分是从 0 到 15 并生成 15 个 worker1 go routines 的循环
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
每次启动一个 worker1 go routine 并将通道和指针传递给 worker1 中的 waitgroup (wg)。
但是 worker1 会做什么呢?
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
worker1 将等待通道 var1 从该通道获取数据并将其传递给通道 var2。
这很好。你绝对不需要这个 time.Sleep(time.Second).
我们下一步
您现在创建一个新循环,它将生成另外 15 个 go 例程 (worker2)。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
worker2 将从通道 var2 获取所有内容并将其传递给通道 var3
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
现在您为 worker3 创建另外 15 个 go 例程。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
并且 worker3 从通道 var3 中获取所有内容,通过将其附加到计数字符串来处理该数据
最后一段代码是将数据播种到频道。该循环从 0 - 100000 开始,对于每个数字,将它们转换为字符串并将其传递给通道 var1
下一个程序将等待所有 go routine 完成并打印结果。
好的,这段代码有一些问题。
- 你绝对不需要这个 time.Sleep(time.Second) 在每个 go routine 之前你也不需要 time.Sleep 在 wg.Wait() 之前。
- 此类工作负载不需要缓冲通道。这是一个简单的管道,你可以使用无缓冲通道,通道将用于任务之间的同步。
当您更改代码以使用无缓冲通道并删除这些 time.Sleep 时,您仍然有问题。问题是 go lang runtime show 的错误:
fatal error: all goroutines are asleep - deadlock!
并终止代码。
但为什么会发生这种情况,我们有 sync.WaitGroup 并且一切看起来都很好。让我们看一个具有相同错误的更简单的程序。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
wg.Wait()
}
此代码也会产生与您的代码相同的错误。这是因为这些通道永远不会关闭,并且 go routines(worker)将永远等待通道中的新数据。 Go 运行时检测到并终止进程。
为了防止这种类型的错误,我们需要添加一些机制来告诉 gorutine 我们已经完成并且 go routine 可以停止监听该通道并正确完成。
发送该信号的最简单方法是关闭该 goroutine 读取的通道。这是解决问题的代码。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}
并且此代码不会产生错误。此代码将正确终止。
不过有个窍门。你如何在你的代码中做到这一点?有 15 个从 var1 通道读取的 go 例程,15 个从 var2 通道读取的 goroutine,15 个从 var3 通道读取。
很难知道什么时候可以关闭哪个频道。我们知道通道 var1 首先处理数据,因此我们可以在生产者完成同步通道中的插入数据时关闭它们。原因是在读取以前的数据之前,我们无法将新数据插入通道。因此,当 producer 插入所有数据时,我们知道通道 var1 上的所有数据都已处理,因此关闭通道是安全的。但是频道 var2 和 var3.
有 15 个不同的 go 例程等待通道 var2,15 个等待 var3,这意味着我们需要找到一种方法在 var2 上的所有处理完成后关闭 var2(在所有 goroutines worker1 ), var3 也一样。这可以通过创建两个额外的 goroutine
wg1 和 wg2 并使用该 goroutine 为 worker1 和 worker2 生成 goroutine,这些 goroutine 将作为编排器工作,在这些函数内部我们只为 worker1 和 worker2 创建新的 sync.Group 这些函数将知道当所有这些子 goroutines 完成时。因此,对于 wg1,当所有这些 worker1 goroutine 完成后,我们可以安全地关闭 var2 通道。 wg2 和 var3 通道相同。
这些是 wg1 和 wg2 函数
// wg1
wg.Add(1)
go func() {
log.Printf("Starting WG1 master go routine")
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i < 15; i++ {
wg1.Add(1)
go worker1(var1, var2, &wg1)
}
wg1.Wait()
}()
// wg2
wg.Add(1)
go func() {
log.Printf("Starting WG2 routine for second stage")
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i < 15; i++ {
wg2.Add(1)
go worker2(var2, var3, &wg2)
}
wg2.Wait()
}()
您可以在以下位置找到完整的工作代码:
Working example
能否让多个go routines监听多个channels,在打印所有问题时都面临issue。
我无法打印所有数字我该如何改进这段代码
如果可能的话,任何人都可以提供一些例子,因为我正在努力解决这个例子。
每个围棋例程后是否需要time.sleep
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var count string
func worker3(var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var3 {
count += ch + " "
}
}
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
func main() {
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
var wg sync.WaitGroup
count = ""
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
for i := 0; i <= 100000; i++ {
var1 <- strconv.Itoa(i)
}
time.Sleep(time.Second)
wg.Wait()
fmt.Println(count)
}
是的,这很复杂,但有一些经验法则应该会让事情变得更简单。
- 更喜欢对传递给 go-routines 的通道使用形式参数,而不是在全局范围内访问通道。您可以通过这种方式获得更多的编译器检查,以及更好的模块化。
- 避免在特定的 go-routine(包括 'main' 例程)中在同一通道上读取和写入。否则,死锁的风险要大得多。
让我们看看您的程序在做什么。 您首先初始化了三个缓冲通道 变量 1、变量 2、变量 3
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
现在你初始化了一个 WaitGroup (wg)
var wg sync.WaitGroup
现在你定义了变量 count 并且该变量是空字符串
count = ""
下一部分是从 0 到 15 并生成 15 个 worker1 go routines 的循环
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
每次启动一个 worker1 go routine 并将通道和指针传递给 worker1 中的 waitgroup (wg)。
但是 worker1 会做什么呢?
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
worker1 将等待通道 var1 从该通道获取数据并将其传递给通道 var2。
这很好。你绝对不需要这个 time.Sleep(time.Second).
我们下一步
您现在创建一个新循环,它将生成另外 15 个 go 例程 (worker2)。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
worker2 将从通道 var2 获取所有内容并将其传递给通道 var3
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
现在您为 worker3 创建另外 15 个 go 例程。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
并且 worker3 从通道 var3 中获取所有内容,通过将其附加到计数字符串来处理该数据
最后一段代码是将数据播种到频道。该循环从 0 - 100000 开始,对于每个数字,将它们转换为字符串并将其传递给通道 var1
下一个程序将等待所有 go routine 完成并打印结果。
好的,这段代码有一些问题。
- 你绝对不需要这个 time.Sleep(time.Second) 在每个 go routine 之前你也不需要 time.Sleep 在 wg.Wait() 之前。
- 此类工作负载不需要缓冲通道。这是一个简单的管道,你可以使用无缓冲通道,通道将用于任务之间的同步。
当您更改代码以使用无缓冲通道并删除这些 time.Sleep 时,您仍然有问题。问题是 go lang runtime show 的错误:
fatal error: all goroutines are asleep - deadlock!
并终止代码。
但为什么会发生这种情况,我们有 sync.WaitGroup 并且一切看起来都很好。让我们看一个具有相同错误的更简单的程序。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
wg.Wait()
}
此代码也会产生与您的代码相同的错误。这是因为这些通道永远不会关闭,并且 go routines(worker)将永远等待通道中的新数据。 Go 运行时检测到并终止进程。
为了防止这种类型的错误,我们需要添加一些机制来告诉 gorutine 我们已经完成并且 go routine 可以停止监听该通道并正确完成。
发送该信号的最简单方法是关闭该 goroutine 读取的通道。这是解决问题的代码。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}
并且此代码不会产生错误。此代码将正确终止。
不过有个窍门。你如何在你的代码中做到这一点?有 15 个从 var1 通道读取的 go 例程,15 个从 var2 通道读取的 goroutine,15 个从 var3 通道读取。
很难知道什么时候可以关闭哪个频道。我们知道通道 var1 首先处理数据,因此我们可以在生产者完成同步通道中的插入数据时关闭它们。原因是在读取以前的数据之前,我们无法将新数据插入通道。因此,当 producer 插入所有数据时,我们知道通道 var1 上的所有数据都已处理,因此关闭通道是安全的。但是频道 var2 和 var3.
有 15 个不同的 go 例程等待通道 var2,15 个等待 var3,这意味着我们需要找到一种方法在 var2 上的所有处理完成后关闭 var2(在所有 goroutines worker1 ), var3 也一样。这可以通过创建两个额外的 goroutine
wg1 和 wg2 并使用该 goroutine 为 worker1 和 worker2 生成 goroutine,这些 goroutine 将作为编排器工作,在这些函数内部我们只为 worker1 和 worker2 创建新的 sync.Group 这些函数将知道当所有这些子 goroutines 完成时。因此,对于 wg1,当所有这些 worker1 goroutine 完成后,我们可以安全地关闭 var2 通道。 wg2 和 var3 通道相同。
这些是 wg1 和 wg2 函数
// wg1
wg.Add(1)
go func() {
log.Printf("Starting WG1 master go routine")
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i < 15; i++ {
wg1.Add(1)
go worker1(var1, var2, &wg1)
}
wg1.Wait()
}()
// wg2
wg.Add(1)
go func() {
log.Printf("Starting WG2 routine for second stage")
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i < 15; i++ {
wg2.Add(1)
go worker2(var2, var3, &wg2)
}
wg2.Wait()
}()
您可以在以下位置找到完整的工作代码: Working example