为什么这个 golang 脚本让我陷入僵局? + 几个问题
Why is this golang script giving me a deadlock ? + a few questions
我从 github 上的某个人那里得到了这段代码,我正在尝试使用它来理解并发性。
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Second)
return url + " added stuff"
}
func main() {
sc := bufio.NewScanner(os.Stdin)
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
for sc.Scan() {
url := sc.Text()
urls <- url
}
for result := range results {
fmt.Printf("%s arrived\n", result)
}
wg.Wait()
close(urls)
close(results)
}
我有几个问题:
- 为什么这段代码会出现死锁?
- for 循环是如何存在的在从用户接收输入的操作之前,go 例程是否会等到 urls 通道中传递任何内容然后开始工作?我不明白这个,因为它不是顺序的,比如为什么从用户那里获取输入然后将每个输入放入 urls 通道然后 运行 go routines 被认为是错误的?
- 在 for 循环中,我有 另一个循环 ,它在 urls 通道上迭代,每个 go 例程是否只处理一行输入?还是一个 go 例程一次处理多行?这些是如何工作的?
- 我在这里收集的输出是否正确?
for 循环创建了 20 个 goroutine,它们都在等待来自 urls
通道的输入。当有人写入此通道时,其中一个 goroutine 将拾取它并在其中继续工作。这是一个典型的工作池实现。
然后,scanner 逐行读取输入,并将其发送到 urls
通道,其中一个 goroutine 将拾取它并将响应写入 results
通道。此时,没有其他 goroutines 从 results
通道读取,所以这将阻塞。
当扫描器读取 URL 时,所有其他 goroutine 将拾取它们并阻止。因此,如果扫描器读取超过 20 个 URL,它将死锁,因为所有 goroutine 都将等待 reader.
如果 URL 少于 20 个,扫描器 for 循环将结束,并读取结果。然而,这最终也会死锁,因为 for 循环将在通道关闭时终止,并且没有人可以关闭通道。
要解决此问题,首先,请在阅读完后立即关闭 urls
频道。这将释放 goroutines 中的所有 for 循环。然后,您应该将从 results
通道读取的 for 循环放入 goroutine 中,这样您就可以在处理结果时调用 wg.Wait
。 wg.Wait
后,您可以关闭results
频道。
这不能保证 results
频道中的所有项目都会被阅读。该程序可能会在处理完所有消息之前终止,因此请使用您在从 results
通道读取的 goroutine 末尾关闭的第三个通道。即:
done:=make(chan struct{})
go func() {
defer close(done)
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
wg.Wait()
close(results)
<-done
大部分情况下您做事都正确,但也有一些不合时宜的地方。 for sc.Scan()
循环将继续,直到 Scanner 完成,而 for result := range results
循环永远不会 运行,因此没有 go 例程(在这种情况下为 'main')将能够接收来自results
。当 运行 使用您的示例时,我在 for sc.Scan()
之前启动了 for result := range results
循环,并且还在其自己的 go 例程中启动了循环——否则将永远无法到达 for sc.Scan()
。
go func() {
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
for sc.Scan() {
url := sc.Text()
urls <- url
}
另外,因为你 运行 wg.Wait()
在 close(urls)
之前,主 goroutine 被阻塞等待 20 sad()
go routines 完成。但是在调用 close(urls)
之前他们无法完成。所以在等待等待组之前关闭那个通道。
close(urls)
wg.Wait()
close(results)
我对以前的答案不是很满意,所以这里有一个基于 go tour, the go doc, the specifications.
中记录的行为的解决方案
package main
import (
"bufio"
"fmt"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Millisecond)
return url + " added stuff"
}
func main() {
// sc := bufio.NewScanner(os.Stdin)
sc := bufio.NewScanner(strings.NewReader(strings.Repeat("blah blah\n", 15)))
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
// results is consumed by so many goroutines
// we must wait for them to finish before closing results
// but we dont want to block here, so put that into a routine.
go func() {
wg.Wait()
close(results)
}()
go func() {
for sc.Scan() {
url := sc.Text()
urls <- url
}
close(urls) // done consuming a channel, close it, right away.
}()
for result := range results {
fmt.Printf("%s arrived\n", result)
} // the program will finish when it gets out of this loop.
// It will get out of this loop because you have made sure the results channel is closed.
}
我从 github 上的某个人那里得到了这段代码,我正在尝试使用它来理解并发性。
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Second)
return url + " added stuff"
}
func main() {
sc := bufio.NewScanner(os.Stdin)
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
for sc.Scan() {
url := sc.Text()
urls <- url
}
for result := range results {
fmt.Printf("%s arrived\n", result)
}
wg.Wait()
close(urls)
close(results)
}
我有几个问题:
- 为什么这段代码会出现死锁?
- for 循环是如何存在的在从用户接收输入的操作之前,go 例程是否会等到 urls 通道中传递任何内容然后开始工作?我不明白这个,因为它不是顺序的,比如为什么从用户那里获取输入然后将每个输入放入 urls 通道然后 运行 go routines 被认为是错误的?
- 在 for 循环中,我有 另一个循环 ,它在 urls 通道上迭代,每个 go 例程是否只处理一行输入?还是一个 go 例程一次处理多行?这些是如何工作的?
- 我在这里收集的输出是否正确?
for 循环创建了 20 个 goroutine,它们都在等待来自 urls
通道的输入。当有人写入此通道时,其中一个 goroutine 将拾取它并在其中继续工作。这是一个典型的工作池实现。
然后,scanner 逐行读取输入,并将其发送到 urls
通道,其中一个 goroutine 将拾取它并将响应写入 results
通道。此时,没有其他 goroutines 从 results
通道读取,所以这将阻塞。
当扫描器读取 URL 时,所有其他 goroutine 将拾取它们并阻止。因此,如果扫描器读取超过 20 个 URL,它将死锁,因为所有 goroutine 都将等待 reader.
如果 URL 少于 20 个,扫描器 for 循环将结束,并读取结果。然而,这最终也会死锁,因为 for 循环将在通道关闭时终止,并且没有人可以关闭通道。
要解决此问题,首先,请在阅读完后立即关闭 urls
频道。这将释放 goroutines 中的所有 for 循环。然后,您应该将从 results
通道读取的 for 循环放入 goroutine 中,这样您就可以在处理结果时调用 wg.Wait
。 wg.Wait
后,您可以关闭results
频道。
这不能保证 results
频道中的所有项目都会被阅读。该程序可能会在处理完所有消息之前终止,因此请使用您在从 results
通道读取的 goroutine 末尾关闭的第三个通道。即:
done:=make(chan struct{})
go func() {
defer close(done)
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
wg.Wait()
close(results)
<-done
大部分情况下您做事都正确,但也有一些不合时宜的地方。 for sc.Scan()
循环将继续,直到 Scanner 完成,而 for result := range results
循环永远不会 运行,因此没有 go 例程(在这种情况下为 'main')将能够接收来自results
。当 运行 使用您的示例时,我在 for sc.Scan()
之前启动了 for result := range results
循环,并且还在其自己的 go 例程中启动了循环——否则将永远无法到达 for sc.Scan()
。
go func() {
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
for sc.Scan() {
url := sc.Text()
urls <- url
}
另外,因为你 运行 wg.Wait()
在 close(urls)
之前,主 goroutine 被阻塞等待 20 sad()
go routines 完成。但是在调用 close(urls)
之前他们无法完成。所以在等待等待组之前关闭那个通道。
close(urls)
wg.Wait()
close(results)
我对以前的答案不是很满意,所以这里有一个基于 go tour, the go doc, the specifications.
中记录的行为的解决方案package main
import (
"bufio"
"fmt"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Millisecond)
return url + " added stuff"
}
func main() {
// sc := bufio.NewScanner(os.Stdin)
sc := bufio.NewScanner(strings.NewReader(strings.Repeat("blah blah\n", 15)))
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
// results is consumed by so many goroutines
// we must wait for them to finish before closing results
// but we dont want to block here, so put that into a routine.
go func() {
wg.Wait()
close(results)
}()
go func() {
for sc.Scan() {
url := sc.Text()
urls <- url
}
close(urls) // done consuming a channel, close it, right away.
}()
for result := range results {
fmt.Printf("%s arrived\n", result)
} // the program will finish when it gets out of this loop.
// It will get out of this loop because you have made sure the results channel is closed.
}