如何等到缓冲通道(信号量)为空?

How to wait until buffered channel (semaphore) is empty?

我有一个整数片段,它们是并发操作的:

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

我正在使用缓冲通道作为信号量,以便有一个并发 运行 go 例程的上限:

sem := make(chan struct{}, 2)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

上面的代码在达到最后一个或最后两个整数之前运行良好,因为程序在最后一个 go 例程完成之前结束。

问题:如何等待缓冲通道耗尽?

您不能以这种方式使用信号量(在本例中为通道)。当您处理值和分派更多 goroutine 时,无法保证它在任何时候都不会为空。在这种情况下,这不是一个问题,特别是因为您正在同步调度工作,但是因为没有无竞争的方式来检查通道的长度,所以没有等待通道长度达到 0 的原语。

使用sync.WaitGroup等待所有goroutines完成

sem := make(chan struct{}, 2)

var wg sync.WaitGroup

for _, i := range ints {
    wg.Add(1)
    // acquire semaphore
    sem <- struct{}{}
    // start long running go routine
    go func(id int) {
        defer wg.Done()
        // do something
        // release semaphore
        <-sem
    }(i)
}

wg.Wait()

显然没有人在等待您的 go-routines 完成。因此程序在最后 2 个 go-routines 完成之前结束。你可以使用一个工作组来等待你所有的 go-routines 在程序结束之前完成。这说明得更好 - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

使用"worker pool"处理您的数据。对于 each int,它比 运行 goroutine 便宜,为它里面的变量分配内存等等...

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

ch := make(chan int)

var wg sync.WaitGroup

// run worker pool
for i := 2; i > 0; i-- {
    wg.Add(1)

    go func() {
        defer wg.Done()

        for id := range ch {
            // do something
            fmt.Println(id)
        }
    }()
}

// send ints to workers
for _, i := range ints {
    ch <- i
}

close(ch)

wg.Wait()

您可以在 for 循环中等待 "sub-goroutines" 当前 goroutine

semLimit := 2
sem := make(chan struct{}, semLimit)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

// wait semaphore
for i := 0; i < semLimit; i++ { 
  wg<-struct{}{} 
}

可选地,也可以用 import sync

的经济性来编写一个简约的 "semaphored waitgroup"
semLimit := 2
// mini semaphored waitgroup 
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }

for _, i := range ints {
  // acquire semaphore
  wgAdd()

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    wgDone()
  }(i, sem)
}

// wait semaphore
wgWait()

这是一个工作示例。最后的 for 循环强制程序等待 直到作业完成:

package main
import "time"

func w(n int, e chan error) {
   // do something
   println(n)
   time.Sleep(time.Second)
   // release semaphore
   <-e
}

func main() {
   a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
   e := make(chan error, 2)
   for _, n := range a {
      // acquire semaphore
      e <- nil
      // start long running go routine
      go w(n, e)
   }
   for n := cap(e); n > 0; n-- {
      e <- nil
   }
}