使用固定数量的 goroutine 迭代输入和存储输出

Iterate over inputs and store outputs using a fixed number of goroutines

我正在做一些必须是常见模式的事情,但我看不出应该如何处理它。

在这个人为的例子中,我有一个计算字符串中字母的函数,我希望它对切片中的每个元素进行 运行,并将结果存储在映射中,所以

[]string = {"one", "two", "three"}

产量

map[string]int = {"one":3, "two":3, "three":5}

我正在使用 guard 模式,以确保在任何时候只有 cores 个 goroutines 运行ning(我认为拥有并发 goroutines 的数量设置为系统上的虚拟处理器数量?)

const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
    name string
    res  int
}

func count_letters(word string, cGuard chan struct{}, cResults chan result) {
    time.Sleep(1 * time.Second)
    fmt.Println(word)
    <-cGuard
    cResults <- result{word, len(word)}
}

func main() {
    cGuard := make(chan struct{}, cores)
    cResults := make(chan result, cores)

    mResults := map[string]int{}

    for _, name := range words {
        cGuard <- struct{}{}
        // Need to populate mResults with the output from cResults 
        go count_letters(name, cGuard, cResults)
    }
    fmt.Scanln()
}

这可行,但我不确定如何从 cResults 通道中获取 result 结构以内联填充地图。

我可以将cResults的缓冲区大小设置为len(words),然后等到for循环结束,然后将它们全部拉出,但这似乎很不优雅,而且是个问题如果words的长度很大?

对于这个特定的用例,worker pool pattern 更合适。

在你的例子中,你为每个单词启动了一个单独的 goroutine,虽然 go 可以处理这个,但它不是很有效,因为运行时必须启动一个新的 go 例程并停止旧的,同时跟踪全部。

有了工作池,我们可以根据需要启动恰好数量的 goroutine,并通过通道为工作人员分配任务。这减少了很多开销工作人员总是相同的 goroutines。结果的收集也是通过一个通道完成的。并使用 WaitGroup 来确保我们不会在所有工作人员完成之前终止。

这是您示例的工作程序池版本:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
    name string
    res  int
}

func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
    // Tell the waitgroup we are done once we return
    defer wg.Done()

    // Read from cWords until it is closed, at which point we return
    for word := range cWords {
        time.Sleep(1 * time.Second)
        cResults <- result{word, len(word)}
    }
}

func main() {
    cWords := make(chan string)
    cResults := make(chan result)

    // This waitgroup will later be used to wait for all worker to be done
    var wg sync.WaitGroup
    for i := 0; i < cores; i++ {
        // Add 1 before starting the goroutine
        wg.Add(1)
        go count_letters(&wg, cWords, cResults)
    }

    // Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
    mResults := map[string]int{}
    go func() {
        for result := range cResults {
            mResults[result.name] = result.res
        }
    }()

    // Insert all words into the cWords chan
    for _, word := range words {
        cWords <- word
    }

    // After all words have been inserted, close the channel, this will cause the workers to exit
    // once all words have been read from the channel
    close(cWords)
    // Wait for all workers to be done
    wg.Wait()
    // Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
    // specific example
    close(cResults)

    // Use the results
    fmt.Println(mResults)
}