与多个 producers/multiple 消费者并发

Concurrency with multiple producers/multiple consumers

我可能遗漏了一些东西,或者不理解 Go 如何处理并发(或者我对并发本身的了解),我设计了一些代码来理解多个 producer/consumer。

这是代码:

package main

import (
    "fmt"
    "time"
    // "math/rand"
    "sync"
)

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)
}

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = <-requestChan
        // <- requestChan
        seq = seq+1
        fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
        generatorChan <- uint64(seq)
    }
}

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i < 5; i++ {
        requestChan<-uint64(id)
        fmt.Println("Conc", id, ": ", <-generatorChan)
    }
}

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        go generateStuff(i)
    }
    maximumWorker := 200
    wg.Add(maximumWorker)
    for i := 0; i < maximumWorker; i++ {
        go concurrentPrint(i, &wg)
    }
    wg.Wait()
}

当运行它打印(主要是按顺序)从 1 到 1000 的所有数字(200 个消费者每人得到一个数字 5 次)。 我原以为有些消费者会打印完全相同的数字,但似乎 requestChan 就像一个屏障一样工作,即使有 20 个 goroutines 服务于 generateStuff 通过增加一个全局变量来生成数字。

一般来说,我对 Go 或并发有什么误解?

我预计会出现类似两个类型为 generateStuff 的例程的情况,它们会一起醒来并同时增加 seq,从而有两个消费者打印相同的数字两次。

编辑 playgolang 代码:http://play.golang.org/p/eRzNXjdxtZ

Channel types

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type. The value of an uninitialized channel is nil.

A new, initialized channel value can be made using the built-in function make, which takes the channel type and an optional capacity as arguments:

make(chan int, 100)

The capacity, in number of elements, sets the size of the buffer in the channel. If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready. Otherwise, the channel is buffered and communication succeeds without blocking if the buffer is not full (sends) or not empty (receives). A nil channel is never ready for communication.

您正在使用无缓冲通道限制通道通信。

例如,

generatorChan = make(chan uint64)
requestChan = make(chan uint64)

您有多个工作人员可以同时 运行 并同时尝试和发出请求。由于 requestChan 是无缓冲的,他们都阻塞等待 reader 同步并接受他们的请求。

您有多个生成器,它们将通过 requestChan 与请求者同步,产生结果,然后阻塞在未缓冲的 generatorChan 上,直到工作人员读取结果。请注意,它可能是不同的工人。

没有额外的同步,所以其他一切都是不确定的。

  • 一个生成器可以处理所有请求。
  • 生成器可以获取请求并通过递增 seq 在任何其他生成器碰巧有机会 运行 之前。如果只有一个处理器,这甚至是可能的。
  • 所有生成器都可以获取请求,但最终都想在同一时间递增 seq,从而导致各种问题。
  • 工作人员可以从他们碰巧发送到完全不同的发电机的同一台发电机获得响应。

一般来说,如果不添加同步来强制执行这些行为之一,您就无法确保其中任何一个确实发生。

请注意,数据竞争本身就是另一个不确定性事件。有可能获得任意值、程序崩溃等。假设在竞争条件下该值可能仅因一个或某些此类相对无害的结果而偏离是不安全的。

对于实验,您最好的办法就是加速 GOMAXPROCS。通过环境变量(例如 env GOMAXPROCS=16 go run foo.gogo build 之后的 env GOMAXPROCS=16 ./foo 之类的变量)或从程序中调用 runtime.GOMAXPROCS(16)。默认值为 1,这意味着可以隐藏数据竞争或其他 "strange" 行为。

您还可以通过在不同点添加对 runtime.Goschedtime.Sleep 的调用来稍微影响事情。

如果您使用竞争检测器(例如 go run -race foo.googo build -race),您也可以看到数据竞争。程序不仅应该在退出时显示 "Found 1 data race(s)",而且还应该在首次检测到竞争时转储大量带有堆栈跟踪的详细信息。

这是您的实验代码的 "cleaned up" 版本:

package main

import (
    "log"
    "sync"
    "sync/atomic"
)

var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)

func generator(genID int) {
    for reqID := range requestChan {
        // If you want to see a data race:
        //seq = seq + 1
        // Else:
        s := atomic.AddUint64(&seq, 1)
        log.Printf("Gen: %2d, from %3d", genID, reqID)
        generatorChan <- s
    }
}

func worker(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i < 5; i++ {
        requestChan <- uint64(id)
        log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)
    }
}

func main() {
    log.SetFlags(log.Lmicroseconds)
    const (
        numGen    = 20
        numWorker = 200
    )
    var wg sync.WaitGroup
    for i := 0; i < numGen; i++ {
        go generator(i)
    }
    wg.Add(numWorker)
    for i := 0; i < numWorker; i++ {
        go worker(i, &wg)
    }
    wg.Wait()
    close(requestChan)
}

Playground(但请注意,playground 上的时间戳没有用,调用 runtime.MAXPROCS 可能不会执行任何操作)。进一步注意 playground 缓存结果,因此重新 运行 完全相同的程序将始终显示相同的输出,你需要做一些小的改变或者只是 运行 在你自己的机器上。

很多小的变化,比如分流生成器,使用 logfmt 因为前者提供并发保证,消除数据竞争,使输出看起来更好等。