使用通道作为队列的死锁
Deadlock using channels as queues
我正在学习 Go,我正在尝试实现一个作业队列。
我想做的是:
让主要的 goroutine feed lines 通过多个解析器工作者的通道(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作者(goroutines)将处理的结构通道(发送到数据库等)。
代码如下所示:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的 playground:https://play.golang.org/p/yz84g6CJraa
工人长这样:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}
我知道问题与“完成”通道有关,因为如果我不使用它,不会有错误,但我不知道如何解决它。
在完成将所有行发送到[=13之前,您不会开始阅读doneQ
=],比缓冲区 space 多行。因此,一旦 doneQ
缓冲区已满,就会发送块,开始填充 lineParseQ
缓冲区,一旦缓冲区已满,就会死锁。将发送到 lineParseQ
的循环、从 doneQ
读取的循环或两者移动到单独的 goroutine,例如:
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
这在最后仍然会死锁,因为你在同一个 goroutine 中有一个通道上的 range
和它后面的 close
;因为 range
一直持续到通道关闭,并且关闭是在 range
完成之后,所以你仍然有一个死锁。您需要将关闭放在适当的位置;如果给定频道有多个发件人,那就是在发送例程中,或者在 WaitGroup
监视发送例程时被阻止。
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例在这里:https://play.golang.org/p/XsnewSZeb2X
协调管道 sync.WaitGroup
将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人正在写入特定通道)时,关闭通道以指示所有“工作人员”退出,例如
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed
缓冲通道可以方便地处理突发工作负载,但有时它们用于避免糟糕设计中的死锁。如果你想避免 运行 管道的某些部分出现在 goroutine 中,你可以缓冲一些通道(通常与 worker 的数量相匹配)以避免主 goroutine 中的阻塞。
如果您有读写的依赖部分并希望避免死锁 - 确保它们位于单独的 goroutine 中。让管道的所有部分成为自己的 goroutine 甚至可以消除对缓冲通道的需要:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
这当然是一种权衡——一个 goroutine 的资源成本约为 2K——而缓冲通道要少得多。与大多数设计一样,这取决于它的使用方式。
也不要被臭名昭著的 Go 抓到 for-loop gotcha,所以使用闭包赋值来避免这种情况:
for i := 1; i <= 5; i++ {
i := i // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}
最后确保在退出前等待所有结果处理完毕。
从基于通道的函数 return 并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是真实的。但是在独立的可执行文件中,处理循环可能仍在处理结果。
go func() {
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
通过在主 goroutine 中处理结果,我们确保我们不会在未处理所有内容的情况下过早地return。
综合考虑:
我正在学习 Go,我正在尝试实现一个作业队列。
我想做的是:
让主要的 goroutine feed lines 通过多个解析器工作者的通道(将一行解析为 s 结构),并让每个解析器将结构发送到其他工作者(goroutines)将处理的结构通道(发送到数据库等)。
代码如下所示:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的 playground:https://play.golang.org/p/yz84g6CJraa
工人长这样:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}
我知道问题与“完成”通道有关,因为如果我不使用它,不会有错误,但我不知道如何解决它。
在完成将所有行发送到[=13之前,您不会开始阅读doneQ
=],比缓冲区 space 多行。因此,一旦 doneQ
缓冲区已满,就会发送块,开始填充 lineParseQ
缓冲区,一旦缓冲区已满,就会死锁。将发送到 lineParseQ
的循环、从 doneQ
读取的循环或两者移动到单独的 goroutine,例如:
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
这在最后仍然会死锁,因为你在同一个 goroutine 中有一个通道上的 range
和它后面的 close
;因为 range
一直持续到通道关闭,并且关闭是在 range
完成之后,所以你仍然有一个死锁。您需要将关闭放在适当的位置;如果给定频道有多个发件人,那就是在发送例程中,或者在 WaitGroup
监视发送例程时被阻止。
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例在这里:https://play.golang.org/p/XsnewSZeb2X
协调管道 sync.WaitGroup
将每个部分分成几个阶段。当您知道管道的一部分已完成(并且没有人正在写入特定通道)时,关闭通道以指示所有“工作人员”退出,例如
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed
缓冲通道可以方便地处理突发工作负载,但有时它们用于避免糟糕设计中的死锁。如果你想避免 运行 管道的某些部分出现在 goroutine 中,你可以缓冲一些通道(通常与 worker 的数量相匹配)以避免主 goroutine 中的阻塞。
如果您有读写的依赖部分并希望避免死锁 - 确保它们位于单独的 goroutine 中。让管道的所有部分成为自己的 goroutine 甚至可以消除对缓冲通道的需要:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
这当然是一种权衡——一个 goroutine 的资源成本约为 2K——而缓冲通道要少得多。与大多数设计一样,这取决于它的使用方式。
也不要被臭名昭著的 Go 抓到 for-loop gotcha,所以使用闭包赋值来避免这种情况:
for i := 1; i <= 5; i++ {
i := i // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}
最后确保在退出前等待所有结果处理完毕。 从基于通道的函数 return 并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是真实的。但是在独立的可执行文件中,处理循环可能仍在处理结果。
go func() {
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
通过在主 goroutine 中处理结果,我们确保我们不会在未处理所有内容的情况下过早地return。
综合考虑: