《The Go Programming Language》一书示例中的 goroutine 泄漏

goroutine leak in example of book The Go Programming Language

我正在阅读 The Go Programming Language 一书,书中有一个演示 goroutine 泄漏的示例


func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }

并且我已经尝试解决了漏洞,得到了如下代码

func request(url string) string {
    res, err := http.Get(url)
    if err == nil {
        body, err := io.ReadAll(res.Body)
        if err == nil {
            return string(body)
        } else {
            return err.Error()
        }
    } else {
        return err.Error()
    }
}

func getany() string {
    rsp := make(chan string, 3)
    done := make(chan struct{}, 3)
    doRequest := func(url string) {
        select {
            case rsp <- request(url):
                fmt.Printf("get %s\n", url)
                done <- struct{}{}
            case <- done:
                fmt.Printf("stop %s\n", url)
                return
        }
    }
    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-rsp
}

但是好像没有解决问题?有什么建议吗?

使用上下文和sync.WaitGroup

为了避免 goroutine 泄漏,您可能希望确保一旦您从 mirroredQuery 中 return,最初在此函数中创建的 goroutine 不会保留 运行ning?

在那种情况下,最重要的是能够 取消 其他 goroutines 当其中一个 goroutines 成功完成请求时。这种取消是在 Go 中使用 context.Context 实现的,net/http 支持。

一旦你有上下文取消,你需要在你的 main 函数中有一个 sync.WaitGroup 来等待所有的 goroutines 成为 Done.

这是一个 doRequest,它使用上下文并包装了本书 request 函数的“HTTP get”功能:

func doRequest(ctx context.Context, url string) string {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        log.Fatal(err)
    }
    res, err := http.DefaultClient.Do(req)

    // err will be non-nil also if the request was canceled
    if err != nil {
        return ""
    }
    defer res.Body.Close()
    b, err := io.ReadAll(res.Body)
    if err != nil {
        return ""
    }
    return string(b)
}
如果上下文被取消,

http.DefaultClient.Do 将提前 return,并出现适当的错误。

现在,处理 goroutine 的函数变成了:

func mirroredQuery() string {
    ctx, cancel := context.WithCancel(context.Background())
    responses := make(chan string, 3)

    fetcher := func(url string, wg *sync.WaitGroup) {
        res := doRequest(ctx, url)
        if res != "" {
            responses <- res
        }
        wg.Done()
    }

    urls := []string{
        "asia.gopl.io",
        "europe.gopl.io",
        "http://google.com",
    }

    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go fetcher(url, &wg)
    }

    res := <-responses
    fmt.Println("got response", res[:300])
    cancel()

    wg.Wait()
    return res
}

注意几点:

  • 每个 goroutine 运行s doRequest 并且只在结果非空时将结果写入 responses(意味着没有发生错误;取消在这里算作错误)
  • WaitGroup用于等待所有worker goroutines退出
  • 主 goroutine 启动所有 worker 然后等待 responses 中的第一个(非空)结果;然后它调用 cancel 取消上下文,它发出所有 worker goroutines 退出的信号,并等待它们完成。

作为练习,扩展此代码以解决几个问题:

  • 区分真正的错误和取消;在当前代码中,如果所有工作人员 运行 都陷入错误
  • ,则可能会出现死锁
  • 使用 select.
  • 向主 goroutine 中读取的 <- responses 添加超时
  • 尽快将代码写入 return 第一个结果给调用者,而后台 goroutine 可以处理取消上下文并等待 workers 退出。毕竟,这里的主要目标是 return 快速获得结果。

你看错书了。本书使用示例来说明如何使用缓冲通道来避免 goroutine 泄漏。

这是紧接着书中示例(第 233 页)的段落:

Had we used an unbuffered channel, the two slower goroutines would have gotten stuck trying to send their responses on a channel from which no goroutine will ever receive. This situation, called a goroutine leak, would be a bug. Unlike garbage variables, leaked goroutines are not automatically collected, so it is important to make sure that goroutines terminate themselves when no longer needed.

注:

  1. 此函数不会尝试优化内存占用或资源使用(包括网络资源)。 Go 的 net/http 包的客户端函数是上下文感知的,因此它可以在请求中间取消,这将节省一些资源(是否对问题很重要将是一个设计决定)。

要使用上下文,您可以:

func mirroredQuery() string {
    responses := make(chan string, 3)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()

    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(ctx context.Context, url string) string {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        panic(err)
    }
    res, err := http.DefaultClient.Do(req)
    if err == nil {
        body, err := io.ReadAll(res.Body)
        if err == nil {
            return string(body)
        } else {
            return err.Error()
        }
    } else {
        return err.Error()
    }
}
  1. 使用缓冲通道分配内存。当 goroutine 太多时,使用缓冲通道太浪费了。

要解决这个问题,您可以使用频道(就像您尝试的那样):

func getAny() string {
    responses := make(chan string)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()
    done := make(chan struct{})
    defer close(done)

    doRequest := func(url string) {
        select {
        case responses <- request(ctx, url):
            fmt.Printf("get %s\n", url)
        case <-done:
            fmt.Printf("stop %s\n", url)
            return
        }
    }

    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-responses // return the quickest response
}

在关闭的频道上总是立即“returns”接收零值,因此用作广播。使用这种“完成通道”是常见的做法。

你也可以使用context.Context:

func mirroredQuery() string {
    responses := make(chan string)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()

    doRequest := func(url string) {
        select {
        case responses <- request(ctx, url):
            fmt.Printf("get %s\n", url)
        case <-ctx.Done():
            fmt.Printf("stop %s\n", url)
            return
        }
    }

    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-responses // return the quickest response
}

在这种情况下更好,因为您已经使用了带有 http 的 context.Context

  1. 使用 sync.WorkGroup 会等待所有请求完成,但 returns 第一个请求。我认为这违背了该功能的目的,并且几乎没有任何好处。而且我不认为在函数本身 returns 之前使所有 goroutines 产生函数 returns 是有意义的(除非该函数是主要函数)。

提供的代码中没有 goroutine 泄漏。 mirroredQuery 方法使用缓冲通道收集结果和 return 第一个答案。当前缓冲区有足够的 space 来收集所有 goroutines 的所有答案,即使从未读取其余响应。如果缓冲区小于 N - 1,情况就会改变,其中 N 是生成的 goroutine 的数量。在这种情况下,mirroredQuery 生成的一些 goroutine 将在尝试向 responses 通道发送响应时卡住。重复调用 mirroredQuery 会导致卡住的 goroutines 增加,这可以称为 goroutines leak。

这是添加了日志的代码以及两种情况的输出。

func mirroredQuery() string {
    responses := make(chan string, 2)
    go func() {
        responses <- request("asia.gopl.io")
        log.Printf("Finished goroutine asia.gopl.io\n")
    }()
    go func() {
        responses <- request("europe.gopl.io")
        log.Printf("Finished goroutine europe.gopl.io\n")
    }()
    go func() {
        responses <- request("americas.gopl.io")
        log.Printf("Finished goroutine americas.gopl.io\n")
    }()
    return <-responses // return the quickest response
}
func request(hostname string) (response string) {
    duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
    time.Sleep(duration)
    return hostname
}

func main() {
    rand.Seed(time.Now().UnixNano())
    result := mirroredQuery()
    log.Printf("Fastest result for %s\n", result)
    time.Sleep(6*time.Second)
}

缓冲区大小的输出 >= N-1

2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io

Process finished with the exit code 0

缓冲区大小 < N-1 的输出

2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io

Process finished with the exit code 0

可以通过在第一个响应到达时引入 goroutines 终止来“改进”上述实现。这可能会减少使用的资源数量。这在很大程度上取决于 request 方法的作用。对于计算量大的场景这是有意义的,因为取消 http 请求可能会导致连接终止,因此下一个请求必须打开新的请求。对于高负载的服务器,即使不使用响应,它也可能不如等待响应有效。

下面是使用 context 用法的改进实现。

func mirroredQuery() string {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    responses := make(chan string)
    f := func(hostname string) {
        response, err := request(ctx, hostname)
        if err != nil {
            log.Printf("Finished %s with error %s\n", hostname, err)
            return
        }
        responses <- response
        log.Printf("Finished %s\n", hostname)
    }
    go f("asia.gopl.io")
    go f("europe.gopl.io")
    go f("americas.gopl.io")
    return <-responses // return the quickest response
}

func request(ctx context.Context, hostname string) (string, error) {
    duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
    after := time.After(duration)
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-after:
        return "response for "+hostname, nil
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    result := mirroredQuery()
    log.Printf("Fastest result for %s\n", result)
    time.Sleep(6 * time.Second)
}