同时,如何管理 values/states 并避免竞争条件

concurrently, how to manage values/states and avoiding a race condition

如何正确地 set/modify 一个基于 events/conditions 的值,该值在进程启动后发生,同时处理 Goroutines 而不创建竞争条件。

例如下面的"works (is buggy)",输出为:

ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true

https://play.golang.org/p/Y3FafF-nBc

package main

import "fmt"

type test struct {
    ch  chan string
    foo bool
}

func (t *test) run() {
    for {
        select {
        case v := <-t.ch:
            fmt.Printf("%+v, foo=%+v\n", v, t.foo)
            t.foo = false
        default:
        }
    }
}

func (t *test) Ping() {
    t.ch <- "ping"
}

func New() *test {
    t := &test{
        ch: make(chan string),
    }
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        if t.foo {
            t.Ping()
        }
        if i%3 == 0 {
            t.foo = true
        }
    }
}

但是如果编译或 运行 使用 -race 选项,我得到这个输出:

$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
  main.(*test).run()
      /main.go:16 +0x1fb

Previous read at 0x00c4200761b8 by main goroutine:
  main.main()
      /main.go:37 +0x5e

Goroutine 6 (running) created at:
  main.New()
      /main.go:30 +0xd0
  main.main()
      /main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66

因此,我想知道我可以使用什么 并发模式 来更改 gorutine 外部和 gorutine 内部的 foo 的值而不用创造竞争条件。

使用互斥体

package main

import (
    "sync"
    "time"
    "fmt"
)

var myvar int
var mut sync.Mutex

func main() {
    for {
        go other()
        go printer()
        time.Sleep(time.Duration(1) * time.Second)
    }
}

func other() {
    mut.Lock()
    myvar = myvar +1
    mut.Unlock()
}

func printer() {
    mut.Lock()
    fmt.Println(myvar)
    mut.Unlock()
}

运行(带互斥锁)

$ go build -race t1.go 
$ ./t1 
1
2
3
4
5
6
7
7
9
10

运行(没有互斥量)

$ go build t2.go 
$ go build -race t2.go 
$ ./t2 
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
  runtime.convT2E()
      /usr/local/go/src/runtime/iface.go:155 +0x0
  main.printer()
      /.../.../.../GOPATH/t2.go:23 +0x65

Previous write at 0x000000580ce8 by goroutine 6:
  main.other()
      /.../.../.../GOPATH/t2.go:19 +0x3d

Goroutine 7 (running) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:13 +0x5a

Goroutine 6 (finished) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2

您有一些选择:

  • 使用 atomic.Value:示例 (1)
  • 使用 sync.RWMutex:示例 (3)
  • 使用 sync/atomic:示例 (6)
  • 仅使用通道和协程:示例 (7)

另见:Use a sync.Mutex or a channel?


1- 您可以使用 atomic.Value:

A Value provides an atomic load and store of a consistently typed value. Values can be created as part of other data structures. The zero value for a Value returns nil from Load. Once Store has been called, a Value must not be copied.

A Value must not be copied after first use.

喜欢这个工作示例:

// to test the panic use go build -race
package main

import (
    "fmt"
    "sync/atomic"
)

type test struct {
    ch chan string
    atomic.Value
}

func (t *test) run() {
    for {
        select {
        case v := <-t.ch:
            fmt.Printf("%+v, foo=%+v\n", v, t.Load())
            t.Store(false)
        default:
        }
    }
}

func (self *test) Ping() {
    self.ch <- "ping"
}

func New() *test {
    t := &test{
        ch: make(chan string),
    }
    t.Store(false)
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        if x, _ := t.Load().(bool); x {
            t.Ping()
        }
        //  time.Sleep(time.Second)
        if i%3 == 0 {
            t.Store(true)
        }
    }
}

输出 go build -race:

ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false

2- func (t *test) run() 的一点改进:

func (t *test) run() {
    for v := range t.ch {
        fmt.Printf("%+v, foo=%+v\n", v, t.Load())
        t.Store(false)
    }
}

3- 您可以使用 sync.RWMutexsync.WaitGroup,就像这个工作示例:

// to test the panic use go build -race
package main

import (
    "fmt"
    "sync"
)

type test struct {
    ch  chan string
    foo bool
    sync.RWMutex
    sync.WaitGroup
}

func (t *test) run() {
    for v := range t.ch {
        t.Lock()
        r := t.foo
        t.foo = false
        t.Unlock()
        fmt.Printf("%+v, foo=%+v\n", v, r)

    }
    t.Done()
}

func (self *test) Ping() {
    self.ch <- "ping"
}

func New() *test {
    t := &test{ch: make(chan string)}
    t.Add(1)
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        t.RLock()
        r := t.foo
        t.RUnlock()
        if r {
            t.Ping()
        }
        //  time.Sleep(time.Second)
        if i%3 == 0 {
            t.Lock()
            t.foo = true
            t.Unlock()
        }
    }
    close(t.ch)
    t.Wait()
}

输出 go build -race:

ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true

4- 所以让我们按照这个方法 https://talks.golang.org/2013/bestpractices.slide#29:
原码:

package main

import (
    "fmt"
    "time"
)

type Server struct{ quit chan bool }

func NewServer() *Server {
    s := &Server{make(chan bool)}
    go s.run()
    return s
}

func (s *Server) run() {
    for {
        select {
        case <-s.quit:
            fmt.Println("finishing task")
            time.Sleep(time.Second)
            fmt.Println("task done")
            s.quit <- true
            return
        case <-time.After(time.Second):
            fmt.Println("running task")
        }
    }
}
func (s *Server) Stop() {
    fmt.Println("server stopping")
    s.quit <- true
    <-s.quit
    fmt.Println("server stopped")
}

func main() {
    s := NewServer()
    time.Sleep(2 * time.Second)
    s.Stop()
}

5- 让我们简化一下:

package main

import (
    "fmt"
    "time"
)

var quit = make(chan bool)

func main() {
    go run()
    time.Sleep(2 * time.Second)
    fmt.Println("server stopping")

    quit <- true // signal to quit

    <-quit // wait for quit signal

    fmt.Println("server stopped")
}

func run() {
    for {
        select {
        case <-quit:
            fmt.Println("finishing task")
            time.Sleep(time.Second)
            fmt.Println("task done")
            quit <- true
            return
        case <-time.After(time.Second):
            fmt.Println("running task")
        }
    }
}

输出:

running task
running task
server stopping
finishing task
task done
server stopped

6- 样本的简化版本:

// to test the panic use go build -race
package main

import "fmt"
import "sync/atomic"

var ch = make(chan string)
var state int32

func main() {
    go run()
    for i := 0; i <= 10; i++ {
        if atomic.LoadInt32(&state) == 1 {
            ch <- "ping"
        }
        if i%3 == 0 {
            atomic.StoreInt32(&state, 1)
        }
    }
}

func run() {
    for v := range ch {
        fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
        atomic.StoreInt32(&state, 0)
    }
}

输出:

ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0

7- 带通道但不使用 Lock() (The Go Playground) 的工作样本:

// to test the panic use go build -race
package main

import "fmt"

func main() {
    go run()
    for i := 0; i <= 10; i++ {
        signal <- struct{}{}
        if <-read {
            ping <- "ping"
        }
        if i%3 == 0 {
            write <- true
        }
    }
}

func run() {
    foo := false
    for {
        select {
        case <-signal:
            fmt.Println("signal", foo)
            read <- foo
        case foo = <-write:
            fmt.Println("write", foo)
        case v := <-ping:
            fmt.Println(v, foo)
            foo = false
        }
    }
}

var (
    ping   = make(chan string)
    signal = make(chan struct{})
    read   = make(chan bool)
    write  = make(chan bool)
)

输出:

signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true