Golang - 使用 go-routine 的竞争条件
Golang - race condition using go-routine
我尝试在我的程序中使用竞赛标志并发现问题:(
函数如下
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
函数调用如下
g.Start(func() (bool, error) {
return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})
g.Start(func() (bool, error) {
return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})
堆栈跟踪如下所示
WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
/Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
/Users/i88893/go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
启动函数是这样的:(我的代码) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
堆栈跟踪中的第二个是这个(不是我的代码)
go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
// Start 在组中的一个新 goroutine 中启动 f。
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
我猜(不是 go 专家)这与同时使用多个 goroutines
中的 g.err
有关,这是不允许的。同样写 g.requeue
知道如何解决这个问题吗?
也许我需要使用 https://pkg.go.dev/sync#RWMutex
但不确定如何...
更新
我采纳了@Danil 的建议(更改锁定位置)并按如下方式更改
在结构中添加互斥锁并在函数中添加锁,是否有意义?现在,当我 运行 带有种族标志时,一切似乎都正常
type WaitingErrorGroup struct {
g *wait.Group
mu sync.Mutex
err error
requeue bool
}
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
g.mu.Lock()
defer g.mu.Unlock()
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
出现问题是因为您试图从不同的 goroutine 操作未同步的共享内存(g.err
在您的情况下)。
您有两种不同的方法 handle concurrent code in go:
- 用于共享内存的同步原语(例如,sync.Mutex)
- 通过通信(例如频道)同步
您的代码似乎遵循 Synchronization primitives for sharing memory
并解决您需要同步访问 g.err
的错误。
您可以使用sync.Mutex and sync.RWMutex
在您的情况下,您将拥有:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
// Lock before reading and writing g.err and unlock after
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
然而,根据 Robe Pike more idiomatic way 的说法,Go 是 Don't communicate by sharing memory; share memory by communicating.
(这意味着使用通道,而不是互斥体)。 @TheFool 和@kostix 已经提到了它
在评论中。
但是我不清楚你是否有可能重新设计你的代码以遵循这个习惯用法这个问题。
您可以使用通道来传达发生的错误并进行处理。
例如,像这样的东西。
func handleErrors(c chan error) *sync.WaitGroup {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for err := range c {
fmt.Println(err)
}
}()
return &wg
}
func main() {
c := make(chan error, 2)
wg := sync.WaitGroup{}
defer handleErrors(c).Wait()
defer close(c)
defer wg.Wait()
wg.Add(2)
go func() {
defer wg.Done()
c <- errors.New("error 1")
}()
go func() {
defer wg.Done()
c <- errors.New("error 2")
}()
}
我认为在 go 中使用通道比其他同步原语(如锁)更为惯用。锁定更难正确,并且可能会带来性能成本。
如果一个go routine有锁,其他goroutines必须等到锁被释放。因此,您在并发执行中引入了瓶颈。在上面的示例中,这是通过缓冲通道来解决的。即使还没有人读取消息,两个 goroutines 仍然能够传递他们的消息而不会被阻塞。
另外,在使用锁的时候,可能会出现锁一直没有释放的情况,比如程序员忘记添加相关行,导致死锁的情况。虽然在不关闭通道的情况下也会发生类似的坏事。
我尝试在我的程序中使用竞赛标志并发现问题:(
函数如下
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
函数调用如下
g.Start(func() (bool, error) {
return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})
g.Start(func() (bool, error) {
return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})
堆栈跟踪如下所示
WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
/Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
/Users/i88893/go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
启动函数是这样的:(我的代码) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
堆栈跟踪中的第二个是这个(不是我的代码)
go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
// Start 在组中的一个新 goroutine 中启动 f。
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
我猜(不是 go 专家)这与同时使用多个 goroutines
中的 g.err
有关,这是不允许的。同样写 g.requeue
知道如何解决这个问题吗?
也许我需要使用 https://pkg.go.dev/sync#RWMutex 但不确定如何...
更新
我采纳了@Danil 的建议(更改锁定位置)并按如下方式更改 在结构中添加互斥锁并在函数中添加锁,是否有意义?现在,当我 运行 带有种族标志时,一切似乎都正常
type WaitingErrorGroup struct {
g *wait.Group
mu sync.Mutex
err error
requeue bool
}
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
g.mu.Lock()
defer g.mu.Unlock()
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
出现问题是因为您试图从不同的 goroutine 操作未同步的共享内存(g.err
在您的情况下)。
您有两种不同的方法 handle concurrent code in go:
- 用于共享内存的同步原语(例如,sync.Mutex)
- 通过通信(例如频道)同步
您的代码似乎遵循 Synchronization primitives for sharing memory
并解决您需要同步访问 g.err
的错误。
您可以使用sync.Mutex and sync.RWMutex
在您的情况下,您将拥有:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
// Lock before reading and writing g.err and unlock after
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
然而,根据 Robe Pike more idiomatic way 的说法,Go 是 Don't communicate by sharing memory; share memory by communicating.
(这意味着使用通道,而不是互斥体)。 @TheFool 和@kostix 已经提到了它
在评论中。
但是我不清楚你是否有可能重新设计你的代码以遵循这个习惯用法这个问题。
您可以使用通道来传达发生的错误并进行处理。
例如,像这样的东西。
func handleErrors(c chan error) *sync.WaitGroup {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for err := range c {
fmt.Println(err)
}
}()
return &wg
}
func main() {
c := make(chan error, 2)
wg := sync.WaitGroup{}
defer handleErrors(c).Wait()
defer close(c)
defer wg.Wait()
wg.Add(2)
go func() {
defer wg.Done()
c <- errors.New("error 1")
}()
go func() {
defer wg.Done()
c <- errors.New("error 2")
}()
}
我认为在 go 中使用通道比其他同步原语(如锁)更为惯用。锁定更难正确,并且可能会带来性能成本。
如果一个go routine有锁,其他goroutines必须等到锁被释放。因此,您在并发执行中引入了瓶颈。在上面的示例中,这是通过缓冲通道来解决的。即使还没有人读取消息,两个 goroutines 仍然能够传递他们的消息而不会被阻塞。
另外,在使用锁的时候,可能会出现锁一直没有释放的情况,比如程序员忘记添加相关行,导致死锁的情况。虽然在不关闭通道的情况下也会发生类似的坏事。