关闭一个 Go 通道,并同步一个 Go 例程
Closing a Go channel, and syncing a go routine
我无法在 go 中终止我的 WaitGroup,因此无法退出范围循环。谁能告诉我为什么。或者更好的方法来限制 go 例程的数量,同时仍然能够在 chan 关闭时退出!
我见过的大多数示例都与静态类型的通道长度有关,但此通道会因其他进程而动态调整大小。
打印示例中的打印语句 ("DONE!") 表明 testValProducer 打印了正确的次数,但代码从未达到 ("--EXIT--"),这意味着 wg.Wait 仍然以某种方式阻塞。
type TestValContainer chan string
func StartFunc(){
testValContainer := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg := sync.WaitGroup{}
// limit the number of worker goroutines
for i:=0; i < 3; i++ {
wg.Add(1)
go func(){
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer{
testValProducer(str, &wg)
}
fmt.Println(v, "--EXIT --") // never called
}()
}
wg.Wait()
close(testValContainer)
}
func get(url string){
http.Get(url)
ch <- getUnvisited()
}
func testValProducer(testStr string, wg *sync.WaitGroup){
doSomething(testStr)
fmt.Println("done !") // called
wg.Done() // NO EFFECT??
}
在你的例子中有两个错误:
- 您在每个工作线程的循环内部调用
wg.Done
,而不是在工作线程结束时(就在它完成之前)。对 wg.Done
的调用必须与 wg.Add(1)
一对一匹配。
- 修复后,主线程等待工作线程完成,而工作线程区域等待主线程关闭输入通道时出现死锁。
如果将 producer 端与 consumer 端更清楚地分开,逻辑将更清晰,更容易理解。 运行 每一方都有一个单独的 goroutine。示例:
// Producer side (only write and close allowed).
go func() {
testValContainer <- "string val 1"
testValContainer <- "string val 2"
testValContainer <- "string val 3"
testValContainer <- "string val 4"
testValContainer <- "string val 5"
testValContainer <- "string val 6"
testValContainer <- "string val 7"
close(testValContainer) // Signals that production is done.
}()
// Consumer side (only read allowed).
for i:=0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer {
doSomething(str)
}
fmt.Println(v, "--EXIT --")
}()
}
wg.Wait()
如果项目是从其他来源生产的,可能是 goroutines 的集合,您仍然应该有:1) 一个单独的 goroutine 或逻辑在某处监督生产并在完成后调用 close
,或者 2) 让你的主线程等待生产端完成(例如 WaitGroup
等待 producer goroutines)并关闭通道 before 等待消费端
如果您考虑一下,无论您如何安排逻辑,您都需要 一些 "side-channel" 检测方式,在一个同步的地方,有不再产生消息。否则你永远不知道频道什么时候应该关闭。
换句话说,您不能等待消费者端的范围循环完成来触发 close
,因为这会导致捕获 22。
我可能会做这样的事情,它让一切都容易理解。我定义了一个结构,它实现了一个信号量来控制启动的活动 Go 例程的数量......并允许我在它们进入时从通道中读取。
package main
import (
"fmt"
"sync"
)
type TestValContainer struct {
wg sync.WaitGroup
sema chan struct{}
data chan int
}
func doSomething(number int) {
fmt.Println(number)
}
func main() {
//semaphore limit 10 routines at time
tvc := TestValContainer{
sema: make(chan struct{}, 10),
data: make(chan int),
}
for i := 0; i <= 100; i++ {
tvc.wg.Add(1)
go func(i int) {
tvc.sema <- struct{}{}
defer func() {
<-tvc.sema
tvc.wg.Done()
}()
tvc.data <- i
}(i)
}
// wait in the background so that waiting and closing the channel dont
// block the for loop below
go func() {
tvc.wg.Wait()
close(tvc.data)
}()
// get channel results
for res := range tvc.data {
doSomething(res)
}
}
我无法在 go 中终止我的 WaitGroup,因此无法退出范围循环。谁能告诉我为什么。或者更好的方法来限制 go 例程的数量,同时仍然能够在 chan 关闭时退出!
我见过的大多数示例都与静态类型的通道长度有关,但此通道会因其他进程而动态调整大小。
打印示例中的打印语句 ("DONE!") 表明 testValProducer 打印了正确的次数,但代码从未达到 ("--EXIT--"),这意味着 wg.Wait 仍然以某种方式阻塞。
type TestValContainer chan string
func StartFunc(){
testValContainer := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg := sync.WaitGroup{}
// limit the number of worker goroutines
for i:=0; i < 3; i++ {
wg.Add(1)
go func(){
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer{
testValProducer(str, &wg)
}
fmt.Println(v, "--EXIT --") // never called
}()
}
wg.Wait()
close(testValContainer)
}
func get(url string){
http.Get(url)
ch <- getUnvisited()
}
func testValProducer(testStr string, wg *sync.WaitGroup){
doSomething(testStr)
fmt.Println("done !") // called
wg.Done() // NO EFFECT??
}
在你的例子中有两个错误:
- 您在每个工作线程的循环内部调用
wg.Done
,而不是在工作线程结束时(就在它完成之前)。对wg.Done
的调用必须与wg.Add(1)
一对一匹配。 - 修复后,主线程等待工作线程完成,而工作线程区域等待主线程关闭输入通道时出现死锁。
如果将 producer 端与 consumer 端更清楚地分开,逻辑将更清晰,更容易理解。 运行 每一方都有一个单独的 goroutine。示例:
// Producer side (only write and close allowed).
go func() {
testValContainer <- "string val 1"
testValContainer <- "string val 2"
testValContainer <- "string val 3"
testValContainer <- "string val 4"
testValContainer <- "string val 5"
testValContainer <- "string val 6"
testValContainer <- "string val 7"
close(testValContainer) // Signals that production is done.
}()
// Consumer side (only read allowed).
for i:=0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer {
doSomething(str)
}
fmt.Println(v, "--EXIT --")
}()
}
wg.Wait()
如果项目是从其他来源生产的,可能是 goroutines 的集合,您仍然应该有:1) 一个单独的 goroutine 或逻辑在某处监督生产并在完成后调用 close
,或者 2) 让你的主线程等待生产端完成(例如 WaitGroup
等待 producer goroutines)并关闭通道 before 等待消费端
如果您考虑一下,无论您如何安排逻辑,您都需要 一些 "side-channel" 检测方式,在一个同步的地方,有不再产生消息。否则你永远不知道频道什么时候应该关闭。
换句话说,您不能等待消费者端的范围循环完成来触发 close
,因为这会导致捕获 22。
我可能会做这样的事情,它让一切都容易理解。我定义了一个结构,它实现了一个信号量来控制启动的活动 Go 例程的数量......并允许我在它们进入时从通道中读取。
package main
import (
"fmt"
"sync"
)
type TestValContainer struct {
wg sync.WaitGroup
sema chan struct{}
data chan int
}
func doSomething(number int) {
fmt.Println(number)
}
func main() {
//semaphore limit 10 routines at time
tvc := TestValContainer{
sema: make(chan struct{}, 10),
data: make(chan int),
}
for i := 0; i <= 100; i++ {
tvc.wg.Add(1)
go func(i int) {
tvc.sema <- struct{}{}
defer func() {
<-tvc.sema
tvc.wg.Done()
}()
tvc.data <- i
}(i)
}
// wait in the background so that waiting and closing the channel dont
// block the for loop below
go func() {
tvc.wg.Wait()
close(tvc.data)
}()
// get channel results
for res := range tvc.data {
doSomething(res)
}
}