在 Go 中使用 sync.Atomic 包防止竞争条件

Preventing race condition using the sync.Atomic package in Go

我在 Go 中实现了以下计数器,我想同时使用它。我正在使用原子包来存储状态,但不确定我是否可以 运行 进入任何竞争条件。我是否还需要添加一个额外的互斥锁以防止递增到零以下,或者原子操作是否提供足够的安全性?谢谢!

type Counter struct {
    counter  uint64
    finished uint32
    sync.Mutex
}

// Inc increments the counter by one
func (c *Counter) Inc() error {
    if c.isFinished() {
        return fmt.Errorf("counter is finished")
    }

    atomic.AddUint64(&c.counter, 1)
    return nil
}

// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
    // prevent overflow
    if !c.isZero() {
        atomic.AddUint64(&c.counter, ^uint64(0))
    }
}

// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
    if !c.isFinished() {
        atomic.StoreUint32(&c.finished, 1)
        atomic.StoreUint64(&c.counter, 0)
    }
}

// isFinished returns true if finished
func (c *Counter) isFinished() bool {
    return atomic.LoadUint32(&c.finished) == uint32(1)
}

// isZero returns true if counter is zero
func (c *Counter) isZero() bool {
    return atomic.LoadUint64(&c.counter) == uint64(0)
}

更新:通过 运行 将下面的代码与 -race 标志结合起来,我能够检测到我确实需要包含互斥体。

var counter *Counter = &Counter{}

func main() {
    wg := sync.WaitGroup{}

    numberOfLoops := 10
    wg.Add(numberOfLoops)

    for i := 0; i < numberOfLoops; i++ {
        go incrementor(&wg)
    }

    wg.Wait()
    fmt.Println("Final Counter:", counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
    rand.Seed(time.Now().UnixNano())
    for i := 0; i < 20; i++ {
        counter.Inc()
        time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
    }
    wg.Done()
    counter.Cancel()
}

Playground link

您不需要额外的互斥锁,您的主要功能是在不使用原子加载的情况下读取 counter.counter,而您的 incrementorcounter.Cancel() 之前调用 wg.Done(),因此你得到了竞争条件。

通过在 counter.Cancel() 之后移动 wg.Done(),竞争条件得到解决:

func main() {
    wg := sync.WaitGroup{}

    numberOfLoops := 10
    wg.Add(numberOfLoops)

    for i := 0; i < numberOfLoops; i++ {
        go incrementor(&wg)
    }

    wg.Wait()
    fmt.Println("Final Counter:", counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
    rand.Seed(time.Now().UnixNano())
    for i := 0; i < 20; i++ {
        counter.Inc()
        time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
    }
    counter.Cancel()
    wg.Done()
}
  • 避免竞争,如“可由竞争检测器检测” :

正如@caveman 回答的那样,您遇到的问题与 wg.Done() / wg.Wait() 指令排序中的一个问题有关,而且您没有使用 atomic.Load() 来访问 counter.counter.

对于这种竞争条件,您的方法是“安全的”。

  • 避免竞争,如“不要达到计数器不一致的状态” :

有问题,(*) 因为你的方法 运行 几个连续的指令来检查和更新对象(例如:if condition { update_fields }) ,并且当您应用 update_fields.

时,您没有同步机制来检查 condition 是否仍然为真

通过将 incrementor 函数更改为:

func incrementor(wg *sync.WaitGroup) {
    for i := 0; i < 20000; i++ {
        counter.Inc()
    }
    counter.Cancel()
    wg.Done()
}

和运行将您的程序多次运行,您应该能够看到“Final Counter:”并不总是以 0 (playground) 结尾。

这是如何发生这种情况的说明:

  • 假设goroutine 1执行counter.Cancel()
  • 而 goroutine 2 执行 counter.Inc()

可能会出现以下执行顺序:

   goroutine 1                                 goroutine 2

                                            1. if c.isFinished() {
                                                   return fmt.Errorf("counter is finished")
                                               }
2. if !c.isFinished() {                         
3.     atomic.StoreUint32(&c.finished, 1)       
4.     atomic.StoreUint64(&c.counter, 0)        
   }                                            
                                            5. atomic.AddUint64(&c.counter, 1)
                                            6. return nil
  • .Inc() 中的 c.isFinished() 指令可能发生 .Cancel() 执行之前,
  • 并且 atomic.AddUint64(&c.counter, 1) 可能会发生 .Cancel() 将计数器重置为零之后。

要避免这种竞争,您需要选择一种方式来同步 inspect + update 指令。

一种常见的方法是使用互斥量:

type Counter struct {
    counter  uint64
    finished uint32
    m        sync.Mutex
}

// Inc increments the counter by one
func (c *Counter) Inc() error {
    c.m.Lock()
    defer c.m.Unlock()

    if c.finished != 0 {
        return fmt.Errorf("counter is finished")
    }

    c.counter++
    return nil
}

// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
    c.m.Lock()
    defer c.m.Unlock()

    // prevent overflow
    if c.counter > 0 {
        c.counter--
    }
}

// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
    c.m.Lock()
    defer c.m.Unlock()

    if c.finished == 0 {
        c.finished = 1
        c.counter = 0
    }
}

playground


(*) [edit] 我最初写道:“因为你的方法检查对象的两个不同字段”,但即使只有一个字段,你也可能遇到类似的问题.

例如,如果有两个goroutines 运行 下面的代码:

if c.LoadUint64(&c.counter) == 0 {
   atomic.AddUint64(&c.counter, 1)
}

您的最终值为 2。

问题在于有多条指令,这些指令不是以原子方式执行的。