使用固定数量的 goroutine 迭代输入和存储输出
Iterate over inputs and store outputs using a fixed number of goroutines
我正在做一些必须是常见模式的事情,但我看不出应该如何处理它。
在这个人为的例子中,我有一个计算字符串中字母的函数,我希望它对切片中的每个元素进行 运行,并将结果存储在映射中,所以
[]string = {"one", "two", "three"}
产量
map[string]int = {"one":3, "two":3, "three":5}
我正在使用 guard
模式,以确保在任何时候只有 cores
个 goroutines 运行ning(我认为拥有并发 goroutines 的数量设置为系统上的虚拟处理器数量?)
const cores int = 2
var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}
type result struct {
name string
res int
}
func count_letters(word string, cGuard chan struct{}, cResults chan result) {
time.Sleep(1 * time.Second)
fmt.Println(word)
<-cGuard
cResults <- result{word, len(word)}
}
func main() {
cGuard := make(chan struct{}, cores)
cResults := make(chan result, cores)
mResults := map[string]int{}
for _, name := range words {
cGuard <- struct{}{}
// Need to populate mResults with the output from cResults
go count_letters(name, cGuard, cResults)
}
fmt.Scanln()
}
这可行,但我不确定如何从 cResults
通道中获取 result
结构以内联填充地图。
我可以将cResults
的缓冲区大小设置为len(words)
,然后等到for循环结束,然后将它们全部拉出,但这似乎很不优雅,而且是个问题如果words
的长度很大?
对于这个特定的用例,worker pool pattern 更合适。
在你的例子中,你为每个单词启动了一个单独的 goroutine,虽然 go 可以处理这个,但它不是很有效,因为运行时必须启动一个新的 go 例程并停止旧的,同时跟踪全部。
有了工作池,我们可以根据需要启动恰好数量的 goroutine,并通过通道为工作人员分配任务。这减少了很多开销工作人员总是相同的 goroutines。结果的收集也是通过一个通道完成的。并使用 WaitGroup 来确保我们不会在所有工作人员完成之前终止。
这是您示例的工作程序池版本:
package main
import (
"fmt"
"sync"
"time"
)
// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2
var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}
type result struct {
name string
res int
}
func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
// Tell the waitgroup we are done once we return
defer wg.Done()
// Read from cWords until it is closed, at which point we return
for word := range cWords {
time.Sleep(1 * time.Second)
cResults <- result{word, len(word)}
}
}
func main() {
cWords := make(chan string)
cResults := make(chan result)
// This waitgroup will later be used to wait for all worker to be done
var wg sync.WaitGroup
for i := 0; i < cores; i++ {
// Add 1 before starting the goroutine
wg.Add(1)
go count_letters(&wg, cWords, cResults)
}
// Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
mResults := map[string]int{}
go func() {
for result := range cResults {
mResults[result.name] = result.res
}
}()
// Insert all words into the cWords chan
for _, word := range words {
cWords <- word
}
// After all words have been inserted, close the channel, this will cause the workers to exit
// once all words have been read from the channel
close(cWords)
// Wait for all workers to be done
wg.Wait()
// Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
// specific example
close(cResults)
// Use the results
fmt.Println(mResults)
}
我正在做一些必须是常见模式的事情,但我看不出应该如何处理它。
在这个人为的例子中,我有一个计算字符串中字母的函数,我希望它对切片中的每个元素进行 运行,并将结果存储在映射中,所以
[]string = {"one", "two", "three"}
产量
map[string]int = {"one":3, "two":3, "three":5}
我正在使用 guard
模式,以确保在任何时候只有 cores
个 goroutines 运行ning(我认为拥有并发 goroutines 的数量设置为系统上的虚拟处理器数量?)
const cores int = 2
var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}
type result struct {
name string
res int
}
func count_letters(word string, cGuard chan struct{}, cResults chan result) {
time.Sleep(1 * time.Second)
fmt.Println(word)
<-cGuard
cResults <- result{word, len(word)}
}
func main() {
cGuard := make(chan struct{}, cores)
cResults := make(chan result, cores)
mResults := map[string]int{}
for _, name := range words {
cGuard <- struct{}{}
// Need to populate mResults with the output from cResults
go count_letters(name, cGuard, cResults)
}
fmt.Scanln()
}
这可行,但我不确定如何从 cResults
通道中获取 result
结构以内联填充地图。
我可以将cResults
的缓冲区大小设置为len(words)
,然后等到for循环结束,然后将它们全部拉出,但这似乎很不优雅,而且是个问题如果words
的长度很大?
对于这个特定的用例,worker pool pattern 更合适。
在你的例子中,你为每个单词启动了一个单独的 goroutine,虽然 go 可以处理这个,但它不是很有效,因为运行时必须启动一个新的 go 例程并停止旧的,同时跟踪全部。
有了工作池,我们可以根据需要启动恰好数量的 goroutine,并通过通道为工作人员分配任务。这减少了很多开销工作人员总是相同的 goroutines。结果的收集也是通过一个通道完成的。并使用 WaitGroup 来确保我们不会在所有工作人员完成之前终止。
这是您示例的工作程序池版本:
package main
import (
"fmt"
"sync"
"time"
)
// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2
var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}
type result struct {
name string
res int
}
func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
// Tell the waitgroup we are done once we return
defer wg.Done()
// Read from cWords until it is closed, at which point we return
for word := range cWords {
time.Sleep(1 * time.Second)
cResults <- result{word, len(word)}
}
}
func main() {
cWords := make(chan string)
cResults := make(chan result)
// This waitgroup will later be used to wait for all worker to be done
var wg sync.WaitGroup
for i := 0; i < cores; i++ {
// Add 1 before starting the goroutine
wg.Add(1)
go count_letters(&wg, cWords, cResults)
}
// Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
mResults := map[string]int{}
go func() {
for result := range cResults {
mResults[result.name] = result.res
}
}()
// Insert all words into the cWords chan
for _, word := range words {
cWords <- word
}
// After all words have been inserted, close the channel, this will cause the workers to exit
// once all words have been read from the channel
close(cWords)
// Wait for all workers to be done
wg.Wait()
// Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
// specific example
close(cResults)
// Use the results
fmt.Println(mResults)
}