同时,如何管理 values/states 并避免竞争条件
concurrently, how to manage values/states and avoiding a race condition
如何正确地 set/modify 一个基于 events/conditions 的值,该值在进程启动后发生,同时处理 Goroutines 而不创建竞争条件。
例如下面的"works (is buggy)",输出为:
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
https://play.golang.org/p/Y3FafF-nBc
package main
import "fmt"
type test struct {
ch chan string
foo bool
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.foo)
t.foo = false
default:
}
}
}
func (t *test) Ping() {
t.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if t.foo {
t.Ping()
}
if i%3 == 0 {
t.foo = true
}
}
}
但是如果编译或 运行 使用 -race 选项,我得到这个输出:
$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
main.(*test).run()
/main.go:16 +0x1fb
Previous read at 0x00c4200761b8 by main goroutine:
main.main()
/main.go:37 +0x5e
Goroutine 6 (running) created at:
main.New()
/main.go:30 +0xd0
main.main()
/main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66
因此,我想知道我可以使用什么 并发模式 来更改 gorutine 外部和 gorutine 内部的 foo
的值而不用创造竞争条件。
使用互斥体
package main
import (
"sync"
"time"
"fmt"
)
var myvar int
var mut sync.Mutex
func main() {
for {
go other()
go printer()
time.Sleep(time.Duration(1) * time.Second)
}
}
func other() {
mut.Lock()
myvar = myvar +1
mut.Unlock()
}
func printer() {
mut.Lock()
fmt.Println(myvar)
mut.Unlock()
}
运行(带互斥锁)
$ go build -race t1.go
$ ./t1
1
2
3
4
5
6
7
7
9
10
运行(没有互斥量)
$ go build t2.go
$ go build -race t2.go
$ ./t2
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
runtime.convT2E()
/usr/local/go/src/runtime/iface.go:155 +0x0
main.printer()
/.../.../.../GOPATH/t2.go:23 +0x65
Previous write at 0x000000580ce8 by goroutine 6:
main.other()
/.../.../.../GOPATH/t2.go:19 +0x3d
Goroutine 7 (running) created at:
main.main()
/.../.../.../GOPATH/t2.go:13 +0x5a
Goroutine 6 (finished) created at:
main.main()
/.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2
您有一些选择:
- 使用
atomic.Value
:示例 (1)
- 使用
sync.RWMutex
:示例 (3)
- 使用
sync/atomic
:示例 (6)
- 仅使用通道和协程:示例 (7)
另见:Use a sync.Mutex or a channel?
1- 您可以使用 atomic.Value
:
A Value provides an atomic load and store of a consistently typed
value. Values can be created as part of other data structures. The
zero value for a Value returns nil from Load. Once Store has been
called, a Value must not be copied.
A Value must not be copied after first use.
喜欢这个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync/atomic"
)
type test struct {
ch chan string
atomic.Value
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
default:
}
}
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
t.Store(false)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if x, _ := t.Load().(bool); x {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Store(true)
}
}
}
输出 go build -race
:
ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false
2- func (t *test) run()
的一点改进:
func (t *test) run() {
for v := range t.ch {
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
}
}
3- 您可以使用 sync.RWMutex
和 sync.WaitGroup
,就像这个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync"
)
type test struct {
ch chan string
foo bool
sync.RWMutex
sync.WaitGroup
}
func (t *test) run() {
for v := range t.ch {
t.Lock()
r := t.foo
t.foo = false
t.Unlock()
fmt.Printf("%+v, foo=%+v\n", v, r)
}
t.Done()
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{ch: make(chan string)}
t.Add(1)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
t.RLock()
r := t.foo
t.RUnlock()
if r {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Lock()
t.foo = true
t.Unlock()
}
}
close(t.ch)
t.Wait()
}
输出 go build -race
:
ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true
4- 所以让我们按照这个方法 https://talks.golang.org/2013/bestpractices.slide#29:
原码:
package main
import (
"fmt"
"time"
)
type Server struct{ quit chan bool }
func NewServer() *Server {
s := &Server{make(chan bool)}
go s.run()
return s
}
func (s *Server) run() {
for {
select {
case <-s.quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
s.quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
func (s *Server) Stop() {
fmt.Println("server stopping")
s.quit <- true
<-s.quit
fmt.Println("server stopped")
}
func main() {
s := NewServer()
time.Sleep(2 * time.Second)
s.Stop()
}
5- 让我们简化一下:
package main
import (
"fmt"
"time"
)
var quit = make(chan bool)
func main() {
go run()
time.Sleep(2 * time.Second)
fmt.Println("server stopping")
quit <- true // signal to quit
<-quit // wait for quit signal
fmt.Println("server stopped")
}
func run() {
for {
select {
case <-quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
输出:
running task
running task
server stopping
finishing task
task done
server stopped
6- 样本的简化版本:
// to test the panic use go build -race
package main
import "fmt"
import "sync/atomic"
var ch = make(chan string)
var state int32
func main() {
go run()
for i := 0; i <= 10; i++ {
if atomic.LoadInt32(&state) == 1 {
ch <- "ping"
}
if i%3 == 0 {
atomic.StoreInt32(&state, 1)
}
}
}
func run() {
for v := range ch {
fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
atomic.StoreInt32(&state, 0)
}
}
输出:
ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0
7- 带通道但不使用 Lock()
(The Go Playground) 的工作样本:
// to test the panic use go build -race
package main
import "fmt"
func main() {
go run()
for i := 0; i <= 10; i++ {
signal <- struct{}{}
if <-read {
ping <- "ping"
}
if i%3 == 0 {
write <- true
}
}
}
func run() {
foo := false
for {
select {
case <-signal:
fmt.Println("signal", foo)
read <- foo
case foo = <-write:
fmt.Println("write", foo)
case v := <-ping:
fmt.Println(v, foo)
foo = false
}
}
}
var (
ping = make(chan string)
signal = make(chan struct{})
read = make(chan bool)
write = make(chan bool)
)
输出:
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
如何正确地 set/modify 一个基于 events/conditions 的值,该值在进程启动后发生,同时处理 Goroutines 而不创建竞争条件。
例如下面的"works (is buggy)",输出为:
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
https://play.golang.org/p/Y3FafF-nBc
package main
import "fmt"
type test struct {
ch chan string
foo bool
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.foo)
t.foo = false
default:
}
}
}
func (t *test) Ping() {
t.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if t.foo {
t.Ping()
}
if i%3 == 0 {
t.foo = true
}
}
}
但是如果编译或 运行 使用 -race 选项,我得到这个输出:
$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
main.(*test).run()
/main.go:16 +0x1fb
Previous read at 0x00c4200761b8 by main goroutine:
main.main()
/main.go:37 +0x5e
Goroutine 6 (running) created at:
main.New()
/main.go:30 +0xd0
main.main()
/main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66
因此,我想知道我可以使用什么 并发模式 来更改 gorutine 外部和 gorutine 内部的 foo
的值而不用创造竞争条件。
使用互斥体
package main
import (
"sync"
"time"
"fmt"
)
var myvar int
var mut sync.Mutex
func main() {
for {
go other()
go printer()
time.Sleep(time.Duration(1) * time.Second)
}
}
func other() {
mut.Lock()
myvar = myvar +1
mut.Unlock()
}
func printer() {
mut.Lock()
fmt.Println(myvar)
mut.Unlock()
}
运行(带互斥锁)
$ go build -race t1.go
$ ./t1
1
2
3
4
5
6
7
7
9
10
运行(没有互斥量)
$ go build t2.go
$ go build -race t2.go
$ ./t2
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
runtime.convT2E()
/usr/local/go/src/runtime/iface.go:155 +0x0
main.printer()
/.../.../.../GOPATH/t2.go:23 +0x65
Previous write at 0x000000580ce8 by goroutine 6:
main.other()
/.../.../.../GOPATH/t2.go:19 +0x3d
Goroutine 7 (running) created at:
main.main()
/.../.../.../GOPATH/t2.go:13 +0x5a
Goroutine 6 (finished) created at:
main.main()
/.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2
您有一些选择:
- 使用
atomic.Value
:示例 (1) - 使用
sync.RWMutex
:示例 (3) - 使用
sync/atomic
:示例 (6) - 仅使用通道和协程:示例 (7)
另见:Use a sync.Mutex or a channel?
1- 您可以使用 atomic.Value
:
A Value provides an atomic load and store of a consistently typed value. Values can be created as part of other data structures. The zero value for a Value returns nil from Load. Once Store has been called, a Value must not be copied.
A Value must not be copied after first use.
喜欢这个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync/atomic"
)
type test struct {
ch chan string
atomic.Value
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
default:
}
}
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
t.Store(false)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if x, _ := t.Load().(bool); x {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Store(true)
}
}
}
输出 go build -race
:
ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false
2- func (t *test) run()
的一点改进:
func (t *test) run() {
for v := range t.ch {
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
}
}
3- 您可以使用 sync.RWMutex
和 sync.WaitGroup
,就像这个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync"
)
type test struct {
ch chan string
foo bool
sync.RWMutex
sync.WaitGroup
}
func (t *test) run() {
for v := range t.ch {
t.Lock()
r := t.foo
t.foo = false
t.Unlock()
fmt.Printf("%+v, foo=%+v\n", v, r)
}
t.Done()
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{ch: make(chan string)}
t.Add(1)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
t.RLock()
r := t.foo
t.RUnlock()
if r {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Lock()
t.foo = true
t.Unlock()
}
}
close(t.ch)
t.Wait()
}
输出 go build -race
:
ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true
4- 所以让我们按照这个方法 https://talks.golang.org/2013/bestpractices.slide#29:
原码:
package main
import (
"fmt"
"time"
)
type Server struct{ quit chan bool }
func NewServer() *Server {
s := &Server{make(chan bool)}
go s.run()
return s
}
func (s *Server) run() {
for {
select {
case <-s.quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
s.quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
func (s *Server) Stop() {
fmt.Println("server stopping")
s.quit <- true
<-s.quit
fmt.Println("server stopped")
}
func main() {
s := NewServer()
time.Sleep(2 * time.Second)
s.Stop()
}
5- 让我们简化一下:
package main
import (
"fmt"
"time"
)
var quit = make(chan bool)
func main() {
go run()
time.Sleep(2 * time.Second)
fmt.Println("server stopping")
quit <- true // signal to quit
<-quit // wait for quit signal
fmt.Println("server stopped")
}
func run() {
for {
select {
case <-quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
输出:
running task
running task
server stopping
finishing task
task done
server stopped
6- 样本的简化版本:
// to test the panic use go build -race
package main
import "fmt"
import "sync/atomic"
var ch = make(chan string)
var state int32
func main() {
go run()
for i := 0; i <= 10; i++ {
if atomic.LoadInt32(&state) == 1 {
ch <- "ping"
}
if i%3 == 0 {
atomic.StoreInt32(&state, 1)
}
}
}
func run() {
for v := range ch {
fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
atomic.StoreInt32(&state, 0)
}
}
输出:
ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0
7- 带通道但不使用 Lock()
(The Go Playground) 的工作样本:
// to test the panic use go build -race
package main
import "fmt"
func main() {
go run()
for i := 0; i <= 10; i++ {
signal <- struct{}{}
if <-read {
ping <- "ping"
}
if i%3 == 0 {
write <- true
}
}
}
func run() {
foo := false
for {
select {
case <-signal:
fmt.Println("signal", foo)
read <- foo
case foo = <-write:
fmt.Println("write", foo)
case v := <-ping:
fmt.Println(v, foo)
foo = false
}
}
}
var (
ping = make(chan string)
signal = make(chan struct{})
read = make(chan bool)
write = make(chan bool)
)
输出:
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true