在 Go 中使用 sync.Atomic 包防止竞争条件
Preventing race condition using the sync.Atomic package in Go
我在 Go 中实现了以下计数器,我想同时使用它。我正在使用原子包来存储状态,但不确定我是否可以 运行 进入任何竞争条件。我是否还需要添加一个额外的互斥锁以防止递增到零以下,或者原子操作是否提供足够的安全性?谢谢!
type Counter struct {
counter uint64
finished uint32
sync.Mutex
}
// Inc increments the counter by one
func (c *Counter) Inc() error {
if c.isFinished() {
return fmt.Errorf("counter is finished")
}
atomic.AddUint64(&c.counter, 1)
return nil
}
// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
// prevent overflow
if !c.isZero() {
atomic.AddUint64(&c.counter, ^uint64(0))
}
}
// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
if !c.isFinished() {
atomic.StoreUint32(&c.finished, 1)
atomic.StoreUint64(&c.counter, 0)
}
}
// isFinished returns true if finished
func (c *Counter) isFinished() bool {
return atomic.LoadUint32(&c.finished) == uint32(1)
}
// isZero returns true if counter is zero
func (c *Counter) isZero() bool {
return atomic.LoadUint64(&c.counter) == uint64(0)
}
更新:通过 运行 将下面的代码与 -race
标志结合起来,我能够检测到我确实需要包含互斥体。
var counter *Counter = &Counter{}
func main() {
wg := sync.WaitGroup{}
numberOfLoops := 10
wg.Add(numberOfLoops)
for i := 0; i < numberOfLoops; i++ {
go incrementor(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter.counter)
}
func incrementor(wg *sync.WaitGroup) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 20; i++ {
counter.Inc()
time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
}
wg.Done()
counter.Cancel()
}
您不需要额外的互斥锁,您的主要功能是在不使用原子加载的情况下读取 counter.counter
,而您的 incrementor
在 counter.Cancel()
之前调用 wg.Done()
,因此你得到了竞争条件。
通过在 counter.Cancel()
之后移动 wg.Done()
,竞争条件得到解决:
func main() {
wg := sync.WaitGroup{}
numberOfLoops := 10
wg.Add(numberOfLoops)
for i := 0; i < numberOfLoops; i++ {
go incrementor(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter.counter)
}
func incrementor(wg *sync.WaitGroup) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 20; i++ {
counter.Inc()
time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
}
counter.Cancel()
wg.Done()
}
- 避免竞争,如“可由竞争检测器检测” :
正如@caveman 回答的那样,您遇到的问题与 wg.Done() / wg.Wait()
指令排序中的一个问题有关,而且您没有使用 atomic.Load()
来访问 counter.counter
.
对于这种竞争条件,您的方法是“安全的”。
- 避免竞争,如“不要达到计数器不一致的状态” :
你 有问题,(*) 因为你的方法 运行 几个连续的指令来检查和更新对象(例如:if condition { update_fields }
) ,并且当您应用 update_fields
.
时,您没有同步机制来检查 condition
是否仍然为真
通过将 incrementor
函数更改为:
func incrementor(wg *sync.WaitGroup) {
for i := 0; i < 20000; i++ {
counter.Inc()
}
counter.Cancel()
wg.Done()
}
和运行将您的程序多次运行,您应该能够看到“Final Counter:”并不总是以 0 (playground) 结尾。
这是如何发生这种情况的说明:
- 假设goroutine 1执行
counter.Cancel()
- 而 goroutine 2 执行
counter.Inc()
可能会出现以下执行顺序:
goroutine 1 goroutine 2
1. if c.isFinished() {
return fmt.Errorf("counter is finished")
}
2. if !c.isFinished() {
3. atomic.StoreUint32(&c.finished, 1)
4. atomic.StoreUint64(&c.counter, 0)
}
5. atomic.AddUint64(&c.counter, 1)
6. return nil
.Inc()
中的 c.isFinished()
指令可能发生 在 .Cancel()
执行之前,
- 并且
atomic.AddUint64(&c.counter, 1)
可能会发生 在 .Cancel()
将计数器重置为零之后。
要避免这种竞争,您需要选择一种方式来同步 inspect + update
指令。
一种常见的方法是使用互斥量:
type Counter struct {
counter uint64
finished uint32
m sync.Mutex
}
// Inc increments the counter by one
func (c *Counter) Inc() error {
c.m.Lock()
defer c.m.Unlock()
if c.finished != 0 {
return fmt.Errorf("counter is finished")
}
c.counter++
return nil
}
// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
c.m.Lock()
defer c.m.Unlock()
// prevent overflow
if c.counter > 0 {
c.counter--
}
}
// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
c.m.Lock()
defer c.m.Unlock()
if c.finished == 0 {
c.finished = 1
c.counter = 0
}
}
(*) [edit] 我最初写道:“因为你的方法检查对象的两个不同字段”,但即使只有一个字段,你也可能遇到类似的问题.
例如,如果有两个goroutines 运行 下面的代码:
if c.LoadUint64(&c.counter) == 0 {
atomic.AddUint64(&c.counter, 1)
}
您的最终值为 2。
问题在于有多条指令,这些指令不是以原子方式执行的。
我在 Go 中实现了以下计数器,我想同时使用它。我正在使用原子包来存储状态,但不确定我是否可以 运行 进入任何竞争条件。我是否还需要添加一个额外的互斥锁以防止递增到零以下,或者原子操作是否提供足够的安全性?谢谢!
type Counter struct {
counter uint64
finished uint32
sync.Mutex
}
// Inc increments the counter by one
func (c *Counter) Inc() error {
if c.isFinished() {
return fmt.Errorf("counter is finished")
}
atomic.AddUint64(&c.counter, 1)
return nil
}
// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
// prevent overflow
if !c.isZero() {
atomic.AddUint64(&c.counter, ^uint64(0))
}
}
// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
if !c.isFinished() {
atomic.StoreUint32(&c.finished, 1)
atomic.StoreUint64(&c.counter, 0)
}
}
// isFinished returns true if finished
func (c *Counter) isFinished() bool {
return atomic.LoadUint32(&c.finished) == uint32(1)
}
// isZero returns true if counter is zero
func (c *Counter) isZero() bool {
return atomic.LoadUint64(&c.counter) == uint64(0)
}
更新:通过 运行 将下面的代码与 -race
标志结合起来,我能够检测到我确实需要包含互斥体。
var counter *Counter = &Counter{}
func main() {
wg := sync.WaitGroup{}
numberOfLoops := 10
wg.Add(numberOfLoops)
for i := 0; i < numberOfLoops; i++ {
go incrementor(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter.counter)
}
func incrementor(wg *sync.WaitGroup) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 20; i++ {
counter.Inc()
time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
}
wg.Done()
counter.Cancel()
}
您不需要额外的互斥锁,您的主要功能是在不使用原子加载的情况下读取 counter.counter
,而您的 incrementor
在 counter.Cancel()
之前调用 wg.Done()
,因此你得到了竞争条件。
通过在 counter.Cancel()
之后移动 wg.Done()
,竞争条件得到解决:
func main() {
wg := sync.WaitGroup{}
numberOfLoops := 10
wg.Add(numberOfLoops)
for i := 0; i < numberOfLoops; i++ {
go incrementor(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter.counter)
}
func incrementor(wg *sync.WaitGroup) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 20; i++ {
counter.Inc()
time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
}
counter.Cancel()
wg.Done()
}
- 避免竞争,如“可由竞争检测器检测” :
正如@caveman 回答的那样,您遇到的问题与 wg.Done() / wg.Wait()
指令排序中的一个问题有关,而且您没有使用 atomic.Load()
来访问 counter.counter
.
对于这种竞争条件,您的方法是“安全的”。
- 避免竞争,如“不要达到计数器不一致的状态” :
你 有问题,(*) 因为你的方法 运行 几个连续的指令来检查和更新对象(例如:if condition { update_fields }
) ,并且当您应用 update_fields
.
condition
是否仍然为真
通过将 incrementor
函数更改为:
func incrementor(wg *sync.WaitGroup) {
for i := 0; i < 20000; i++ {
counter.Inc()
}
counter.Cancel()
wg.Done()
}
和运行将您的程序多次运行,您应该能够看到“Final Counter:”并不总是以 0 (playground) 结尾。
这是如何发生这种情况的说明:
- 假设goroutine 1执行
counter.Cancel()
- 而 goroutine 2 执行
counter.Inc()
可能会出现以下执行顺序:
goroutine 1 goroutine 2
1. if c.isFinished() {
return fmt.Errorf("counter is finished")
}
2. if !c.isFinished() {
3. atomic.StoreUint32(&c.finished, 1)
4. atomic.StoreUint64(&c.counter, 0)
}
5. atomic.AddUint64(&c.counter, 1)
6. return nil
.Inc()
中的c.isFinished()
指令可能发生 在.Cancel()
执行之前,- 并且
atomic.AddUint64(&c.counter, 1)
可能会发生 在.Cancel()
将计数器重置为零之后。
要避免这种竞争,您需要选择一种方式来同步 inspect + update
指令。
一种常见的方法是使用互斥量:
type Counter struct {
counter uint64
finished uint32
m sync.Mutex
}
// Inc increments the counter by one
func (c *Counter) Inc() error {
c.m.Lock()
defer c.m.Unlock()
if c.finished != 0 {
return fmt.Errorf("counter is finished")
}
c.counter++
return nil
}
// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
c.m.Lock()
defer c.m.Unlock()
// prevent overflow
if c.counter > 0 {
c.counter--
}
}
// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
c.m.Lock()
defer c.m.Unlock()
if c.finished == 0 {
c.finished = 1
c.counter = 0
}
}
(*) [edit] 我最初写道:“因为你的方法检查对象的两个不同字段”,但即使只有一个字段,你也可能遇到类似的问题.
例如,如果有两个goroutines 运行 下面的代码:
if c.LoadUint64(&c.counter) == 0 {
atomic.AddUint64(&c.counter, 1)
}
您的最终值为 2。
问题在于有多条指令,这些指令不是以原子方式执行的。