如何等到缓冲通道(信号量)为空?
How to wait until buffered channel (semaphore) is empty?
我有一个整数片段,它们是并发操作的:
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
我正在使用缓冲通道作为信号量,以便有一个并发 运行 go 例程的上限:
sem := make(chan struct{}, 2)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
上面的代码在达到最后一个或最后两个整数之前运行良好,因为程序在最后一个 go 例程完成之前结束。
问题:如何等待缓冲通道耗尽?
您不能以这种方式使用信号量(在本例中为通道)。当您处理值和分派更多 goroutine 时,无法保证它在任何时候都不会为空。在这种情况下,这不是一个问题,特别是因为您正在同步调度工作,但是因为没有无竞争的方式来检查通道的长度,所以没有等待通道长度达到 0 的原语。
使用sync.WaitGroup
等待所有goroutines完成
sem := make(chan struct{}, 2)
var wg sync.WaitGroup
for _, i := range ints {
wg.Add(1)
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int) {
defer wg.Done()
// do something
// release semaphore
<-sem
}(i)
}
wg.Wait()
显然没有人在等待您的 go-routines 完成。因此程序在最后 2 个 go-routines 完成之前结束。你可以使用一个工作组来等待你所有的 go-routines 在程序结束之前完成。这说明得更好 - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/
使用"worker pool"处理您的数据。对于 each int,它比 运行 goroutine 便宜,为它里面的变量分配内存等等...
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ch := make(chan int)
var wg sync.WaitGroup
// run worker pool
for i := 2; i > 0; i-- {
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
// do something
fmt.Println(id)
}
}()
}
// send ints to workers
for _, i := range ints {
ch <- i
}
close(ch)
wg.Wait()
您可以在 for 循环中等待 "sub-goroutines" 当前 goroutine
semLimit := 2
sem := make(chan struct{}, semLimit)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
// wait semaphore
for i := 0; i < semLimit; i++ {
wg<-struct{}{}
}
可选地,也可以用 import sync
的经济性来编写一个简约的 "semaphored waitgroup"
semLimit := 2
// mini semaphored waitgroup
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }
for _, i := range ints {
// acquire semaphore
wgAdd()
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
wgDone()
}(i, sem)
}
// wait semaphore
wgWait()
这是一个工作示例。最后的 for
循环强制程序等待
直到作业完成:
package main
import "time"
func w(n int, e chan error) {
// do something
println(n)
time.Sleep(time.Second)
// release semaphore
<-e
}
func main() {
a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
e := make(chan error, 2)
for _, n := range a {
// acquire semaphore
e <- nil
// start long running go routine
go w(n, e)
}
for n := cap(e); n > 0; n-- {
e <- nil
}
}
我有一个整数片段,它们是并发操作的:
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
我正在使用缓冲通道作为信号量,以便有一个并发 运行 go 例程的上限:
sem := make(chan struct{}, 2)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
上面的代码在达到最后一个或最后两个整数之前运行良好,因为程序在最后一个 go 例程完成之前结束。
问题:如何等待缓冲通道耗尽?
您不能以这种方式使用信号量(在本例中为通道)。当您处理值和分派更多 goroutine 时,无法保证它在任何时候都不会为空。在这种情况下,这不是一个问题,特别是因为您正在同步调度工作,但是因为没有无竞争的方式来检查通道的长度,所以没有等待通道长度达到 0 的原语。
使用sync.WaitGroup
等待所有goroutines完成
sem := make(chan struct{}, 2)
var wg sync.WaitGroup
for _, i := range ints {
wg.Add(1)
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int) {
defer wg.Done()
// do something
// release semaphore
<-sem
}(i)
}
wg.Wait()
显然没有人在等待您的 go-routines 完成。因此程序在最后 2 个 go-routines 完成之前结束。你可以使用一个工作组来等待你所有的 go-routines 在程序结束之前完成。这说明得更好 - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/
使用"worker pool"处理您的数据。对于 each int,它比 运行 goroutine 便宜,为它里面的变量分配内存等等...
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ch := make(chan int)
var wg sync.WaitGroup
// run worker pool
for i := 2; i > 0; i-- {
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
// do something
fmt.Println(id)
}
}()
}
// send ints to workers
for _, i := range ints {
ch <- i
}
close(ch)
wg.Wait()
您可以在 for 循环中等待 "sub-goroutines" 当前 goroutine
semLimit := 2
sem := make(chan struct{}, semLimit)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
// wait semaphore
for i := 0; i < semLimit; i++ {
wg<-struct{}{}
}
可选地,也可以用 import sync
semLimit := 2
// mini semaphored waitgroup
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }
for _, i := range ints {
// acquire semaphore
wgAdd()
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
wgDone()
}(i, sem)
}
// wait semaphore
wgWait()
这是一个工作示例。最后的 for
循环强制程序等待
直到作业完成:
package main
import "time"
func w(n int, e chan error) {
// do something
println(n)
time.Sleep(time.Second)
// release semaphore
<-e
}
func main() {
a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
e := make(chan error, 2)
for _, n := range a {
// acquire semaphore
e <- nil
// start long running go routine
go w(n, e)
}
for n := cap(e); n > 0; n-- {
e <- nil
}
}