使用 goroutines 处理值并将结果收集到切片中
Using goroutines to process values and gather results into a slice
我最近在探索 Go,goroutines 的工作方式让我很困惑。
我尝试使用 goroutines 将我之前编写的代码移植到 Go 中,但出现 fatal error: all goroutines are asleep - deadlock!
错误。
我想做的是使用 goroutines 处理列表中的项目,然后将处理后的值收集到一个新列表中。但是我在 "gathering" 部分遇到了问题。
代码:
sampleChan := make(chan sample)
var wg sync.WaitGroup
// Read from contents list
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()
// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
sampleList = append(sampleList, s)
}
close(sampleChan)
从 goroutines 收集结果的正确方法是什么?
我知道切片不是线程安全的,所以我不能让每个 goroutine 都附加到切片上。
有两个问题
- 使用无缓冲通道:无缓冲通道阻塞接收方直到数据在通道上可用,发送方直到接收方available.That导致错误
- 在范围之前不关闭通道: 因为你永远不会关闭通道,范围循环将永远不会结束。
您必须使用 buffered
频道和 close
范围
之前的频道
代码
package main
import (
"fmt"
"sync"
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int,len(contents))
var wg sync.WaitGroup
// Read from contents list
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &wg)
}
wg.Wait()
close(sampleChan)
// Read from sampleChan and put into a slice
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}
播放 link : https://play.golang.org/p/k03vt3hd3P
编辑:
另一种获得更好性能的方法是 运行 producer
和 consumer
同时
修改后的代码
package main
import (
"fmt"
"sync"
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch <- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &wg, sampleChan)
wg.Add(1)
go collectResult(&wg, sampleChan, &sampleList)
wg.Wait()
fmt.Println(sampleList)
}
您的代码几乎是正确的。有几个问题:首先,您在收集结果之前等待所有工作人员完成,其次,您的 for
循环在通道关闭时终止,但通道仅在 for
循环终止。
您可以通过在工作人员完成时异步关闭通道来修复代码:
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
go func() {
wg.Wait()
close(sampleChan)
}()
for s := range sampleChan {
..
}
作为风格说明(和 https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions 之后),如果 newSample
是一个简单的同步函数,不使用等待组和通道,并且简单地生成它的结果。那么工作人员代码将如下所示:
for i, line := range contents {
wg.Add(1)
go func(line string) {
defer wg.Done()
sampleChan <- newSample(line, *replicatePtr, *timePtr)
}(line)
}
这使您的并发原语保持在一起,除了简化 newSample
并使其更易于测试之外,它还允许您查看并发情况,并直观地检查 wg.Done()
总是被调用。如果您想重构代码以使用固定数量的工作人员,那么您的更改都将是本地的。
我最近在探索 Go,goroutines 的工作方式让我很困惑。
我尝试使用 goroutines 将我之前编写的代码移植到 Go 中,但出现 fatal error: all goroutines are asleep - deadlock!
错误。
我想做的是使用 goroutines 处理列表中的项目,然后将处理后的值收集到一个新列表中。但是我在 "gathering" 部分遇到了问题。
代码:
sampleChan := make(chan sample)
var wg sync.WaitGroup
// Read from contents list
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()
// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
sampleList = append(sampleList, s)
}
close(sampleChan)
从 goroutines 收集结果的正确方法是什么?
我知道切片不是线程安全的,所以我不能让每个 goroutine 都附加到切片上。
有两个问题
- 使用无缓冲通道:无缓冲通道阻塞接收方直到数据在通道上可用,发送方直到接收方available.That导致错误
- 在范围之前不关闭通道: 因为你永远不会关闭通道,范围循环将永远不会结束。
您必须使用 buffered
频道和 close
范围
代码
package main
import (
"fmt"
"sync"
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int,len(contents))
var wg sync.WaitGroup
// Read from contents list
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &wg)
}
wg.Wait()
close(sampleChan)
// Read from sampleChan and put into a slice
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}
播放 link : https://play.golang.org/p/k03vt3hd3P
编辑:
另一种获得更好性能的方法是 运行 producer
和 consumer
同时
修改后的代码
package main
import (
"fmt"
"sync"
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch <- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &wg, sampleChan)
wg.Add(1)
go collectResult(&wg, sampleChan, &sampleList)
wg.Wait()
fmt.Println(sampleList)
}
您的代码几乎是正确的。有几个问题:首先,您在收集结果之前等待所有工作人员完成,其次,您的 for
循环在通道关闭时终止,但通道仅在 for
循环终止。
您可以通过在工作人员完成时异步关闭通道来修复代码:
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
go func() {
wg.Wait()
close(sampleChan)
}()
for s := range sampleChan {
..
}
作为风格说明(和 https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions 之后),如果 newSample
是一个简单的同步函数,不使用等待组和通道,并且简单地生成它的结果。那么工作人员代码将如下所示:
for i, line := range contents {
wg.Add(1)
go func(line string) {
defer wg.Done()
sampleChan <- newSample(line, *replicatePtr, *timePtr)
}(line)
}
这使您的并发原语保持在一起,除了简化 newSample
并使其更易于测试之外,它还允许您查看并发情况,并直观地检查 wg.Done()
总是被调用。如果您想重构代码以使用固定数量的工作人员,那么您的更改都将是本地的。