使用通道作为队列的死锁

Deadlock using channels as queues

我正在学习 Go,我正在尝试实现一个作业队列。

我想做的是:

让主要的 goroutine feed lines 通过多个解析器工作者的通道(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作者(goroutines)将处理的结构通道(发送到数据库等)。

代码如下所示:

lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)

fileName := "myfile.csv"

file, err := os.Open(fileName)
if err != nil {
    log.Fatal(err)
}

defer file.Close()

reader := bufio.NewReader(file)

// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
    go lineToStructWorker(i, lineParseQ, jobProcessQ)
}

// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
    go WorkerProcessStruct(i, jobProcessQ, doneQ)
}

lineCount := 0 
countSend := 0

for {
    line, err := reader.ReadString('\n')
    
    if err != nil && err != io.EOF {
        log.Fatal(err)
    }
    
    if err == io.EOF {
        break
    }
    
    lineCount++
    
    if lineCount > 1 {
        countSend++
        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'
    }

}

for i := 0; i < countSend; i++ {
    fmt.Printf("Received %+v.\n", <-doneQ)
}

close(doneQ)
close(jobProcessQ)
close(lineParseQ)

这是一个简化的 playground:https://play.golang.org/p/yz84g6CJraa

工人长这样:

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {

    for j := range lineQ {
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    }

}

func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {

    for a := range strQ {
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    }
}

我知道问题与“完成”通道有关,因为如果我不使用它,不会有错误,但我不知道如何解决它。

在完成将所有行发送到[=13之前,您不会开始阅读doneQ =],比缓冲区 space 多行。因此,一旦 doneQ 缓冲区已满,就会发送块,开始填充 lineParseQ 缓冲区,一旦缓冲区已满,就会死锁。将发送到 lineParseQ 的循环、从 doneQ 读取的循环或两者移动到单独的 goroutine,例如:

go func() {
    for _, line := range lines {
        countSend++
        lineParseQ <- line
    }
    close(lineParseQ)
}()

这在最后仍然会死锁,因为你在同一个 goroutine 中有一个通道上的 range 和它后面的 close;因为 range 一直持续到通道关闭,并且关闭是在 range 完成之后,所以你仍然有一个死锁。您需要将关闭放在适当的位置;如果给定频道有多个发件人,那就是在发送例程中,或者在 WaitGroup 监视发送例程时被阻止。

// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
    wg.Add(1)
    go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}

// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
    go WorkerProcessStruct(i, jobProcessQ, doneQ)
}

countSend := 0

go func() {
    for _, line := range lines {
        countSend++
        lineParseQ <- line
    }
    close(lineParseQ)
}()

go func() {
    wg.Wait()
    close(jobProcessQ)
}()

for a := range doneQ {
    fmt.Printf("Received %v.\n", a)
}

// ...

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
    for j := range lineQ {
        strQ <- lineToStruct(j) // just parses the csv to a struct...
    }
    wg.Done()
}

func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
    for a := range strQ {
        time.Sleep(time.Millisecond * 500) // fake long operation...
        done <- a
    }
    close(done)
}

完整的工作示例在这里:https://play.golang.org/p/XsnewSZeb2X

协调管道 sync.WaitGroup 将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人正在写入特定通道)时,关闭通道以指示所有“工作人员”退出,例如

var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
    i := i
    wg.Add(1)
    go func() {
        Worker(i)
        wg.Done()
    }()
}

// wg.Wait() signals the above have completed

缓冲通道可以方便地处理突发工作负载,但有时它们用于避免糟糕设计中的死锁。如果你想避免 运行 管道的某些部分出现在 goroutine 中,你可以缓冲一些通道(通常与 worker 的数量相匹配)以避免主 goroutine 中的阻塞。

如果您有读写的依赖部分并希望避免死锁 - 确保它们位于单独的 goroutine 中。让管道的所有部分成为自己的 goroutine 甚至可以消除对缓冲通道的需要:

// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)

这当然是一种权衡——一个 goroutine 的资源成本约为 2K——而缓冲通道要少得多。与大多数设计一样,这取决于它的使用方式。

也不要被臭名昭著的 Go 抓到 for-loop gotcha,所以使用闭包赋值来避免这种情况:

for i := 1; i <= 5; i++ {
    i := i       // new i (not the i above)
    go func() {
        myfunc(i) // otherwise all goroutines will most likely get '5'
    }()
}

最后确保在退出前等待所有结果处理完毕。 从基于通道的函数 return 并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是真实的。但是在独立的可执行文件中,处理循环可能仍在处理结果。

go func() {
    wgW.Wait()   // waiting on worker goroutines to finish
    close(doneQ) // safe to close results channel now
}()

// ensure we don't return until all results have been processed
for a := range doneQ {
    fmt.Printf("Received %v.\n", a)
}

通过在主 goroutine 中处理结果,我们确保我们不会在未处理所有内容的情况下过早地return。

综合考虑:

https://play.golang.org/p/MjLpQ5xglP3