《The Go Programming Language》一书示例中的 goroutine 泄漏
goroutine leak in example of book The Go Programming Language
我正在阅读 The Go Programming Language 一书,书中有一个演示 goroutine 泄漏的示例
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
并且我已经尝试解决了漏洞,得到了如下代码
func request(url string) string {
res, err := http.Get(url)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
func getany() string {
rsp := make(chan string, 3)
done := make(chan struct{}, 3)
doRequest := func(url string) {
select {
case rsp <- request(url):
fmt.Printf("get %s\n", url)
done <- struct{}{}
case <- done:
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-rsp
}
但是好像没有解决问题?有什么建议吗?
使用上下文和sync.WaitGroup
为了避免 goroutine 泄漏,您可能希望确保一旦您从 mirroredQuery
中 return,最初在此函数中创建的 goroutine 不会保留 运行ning?
在那种情况下,最重要的是能够 取消 其他 goroutines 当其中一个 goroutines 成功完成请求时。这种取消是在 Go 中使用 context.Context
实现的,net/http
支持。
一旦你有上下文取消,你需要在你的 main 函数中有一个 sync.WaitGroup
来等待所有的 goroutines 成为 Done
.
这是一个 doRequest
,它使用上下文并包装了本书 request
函数的“HTTP get”功能:
func doRequest(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Fatal(err)
}
res, err := http.DefaultClient.Do(req)
// err will be non-nil also if the request was canceled
if err != nil {
return ""
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return ""
}
return string(b)
}
如果上下文被取消,http.DefaultClient.Do
将提前 return,并出现适当的错误。
现在,处理 goroutine 的函数变成了:
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
responses := make(chan string, 3)
fetcher := func(url string, wg *sync.WaitGroup) {
res := doRequest(ctx, url)
if res != "" {
responses <- res
}
wg.Done()
}
urls := []string{
"asia.gopl.io",
"europe.gopl.io",
"http://google.com",
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go fetcher(url, &wg)
}
res := <-responses
fmt.Println("got response", res[:300])
cancel()
wg.Wait()
return res
}
注意几点:
- 每个 goroutine 运行s
doRequest
并且只在结果非空时将结果写入 responses
(意味着没有发生错误;取消在这里算作错误)
WaitGroup
用于等待所有worker goroutines退出
- 主 goroutine 启动所有 worker 然后等待
responses
中的第一个(非空)结果;然后它调用 cancel
取消上下文,它发出所有 worker goroutines 退出的信号,并等待它们完成。
作为练习,扩展此代码以解决几个问题:
- 区分真正的错误和取消;在当前代码中,如果所有工作人员 运行 都陷入错误
,则可能会出现死锁
- 使用
select
. 向主 goroutine 中读取的 <- responses
添加超时
- 尽快将代码写入 return 第一个结果给调用者,而后台 goroutine 可以处理取消上下文并等待 workers 退出。毕竟,这里的主要目标是 return 快速获得结果。
你看错书了。本书使用示例来说明如何使用缓冲通道来避免 goroutine 泄漏。
这是紧接着书中示例(第 233 页)的段落:
Had we used an unbuffered channel, the two slower goroutines would
have gotten stuck trying to send their responses on a channel from
which no goroutine will ever receive. This situation, called a
goroutine leak, would be a bug. Unlike garbage variables, leaked
goroutines are not automatically collected, so it is important to make
sure that goroutines terminate themselves when no longer needed.
注:
- 此函数不会尝试优化内存占用或资源使用(包括网络资源)。 Go 的
net/http
包的客户端函数是上下文感知的,因此它可以在请求中间取消,这将节省一些资源(是否对问题很重要将是一个设计决定)。
要使用上下文,您可以:
func mirroredQuery() string {
responses := make(chan string, 3)
ctx, cf := context.WithCancel(context.Background())
defer cf()
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
panic(err)
}
res, err := http.DefaultClient.Do(req)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
- 使用缓冲通道分配内存。当 goroutine 太多时,使用缓冲通道太浪费了。
要解决这个问题,您可以使用频道(就像您尝试的那样):
func getAny() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
done := make(chan struct{})
defer close(done)
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-done:
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在关闭的频道上总是立即“returns”接收零值,因此用作广播。使用这种“完成通道”是常见的做法。
你也可以使用context.Context
:
func mirroredQuery() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-ctx.Done():
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在这种情况下更好,因为您已经使用了带有 http 的 context.Context
。
- 使用
sync.WorkGroup
会等待所有请求完成,但 returns 第一个请求。我认为这违背了该功能的目的,并且几乎没有任何好处。而且我不认为在函数本身 returns 之前使所有 goroutines 产生函数 returns 是有意义的(除非该函数是主要函数)。
提供的代码中没有 goroutine 泄漏。 mirroredQuery
方法使用缓冲通道收集结果和 return 第一个答案。当前缓冲区有足够的 space 来收集所有 goroutines 的所有答案,即使从未读取其余响应。如果缓冲区小于 N - 1,情况就会改变,其中 N 是生成的 goroutine 的数量。在这种情况下,mirroredQuery
生成的一些 goroutine 将在尝试向 responses
通道发送响应时卡住。重复调用 mirroredQuery
会导致卡住的 goroutines 增加,这可以称为 goroutines leak。
这是添加了日志的代码以及两种情况的输出。
func mirroredQuery() string {
responses := make(chan string, 2)
go func() {
responses <- request("asia.gopl.io")
log.Printf("Finished goroutine asia.gopl.io\n")
}()
go func() {
responses <- request("europe.gopl.io")
log.Printf("Finished goroutine europe.gopl.io\n")
}()
go func() {
responses <- request("americas.gopl.io")
log.Printf("Finished goroutine americas.gopl.io\n")
}()
return <-responses // return the quickest response
}
func request(hostname string) (response string) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
time.Sleep(duration)
return hostname
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6*time.Second)
}
缓冲区大小的输出 >= N-1
2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io
Process finished with the exit code 0
缓冲区大小 < N-1 的输出
2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io
Process finished with the exit code 0
可以通过在第一个响应到达时引入 goroutines 终止来“改进”上述实现。这可能会减少使用的资源数量。这在很大程度上取决于 request
方法的作用。对于计算量大的场景这是有意义的,因为取消 http 请求可能会导致连接终止,因此下一个请求必须打开新的请求。对于高负载的服务器,即使不使用响应,它也可能不如等待响应有效。
下面是使用 context
用法的改进实现。
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
responses := make(chan string)
f := func(hostname string) {
response, err := request(ctx, hostname)
if err != nil {
log.Printf("Finished %s with error %s\n", hostname, err)
return
}
responses <- response
log.Printf("Finished %s\n", hostname)
}
go f("asia.gopl.io")
go f("europe.gopl.io")
go f("americas.gopl.io")
return <-responses // return the quickest response
}
func request(ctx context.Context, hostname string) (string, error) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
after := time.After(duration)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-after:
return "response for "+hostname, nil
}
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6 * time.Second)
}
我正在阅读 The Go Programming Language 一书,书中有一个演示 goroutine 泄漏的示例
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
并且我已经尝试解决了漏洞,得到了如下代码
func request(url string) string {
res, err := http.Get(url)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
func getany() string {
rsp := make(chan string, 3)
done := make(chan struct{}, 3)
doRequest := func(url string) {
select {
case rsp <- request(url):
fmt.Printf("get %s\n", url)
done <- struct{}{}
case <- done:
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-rsp
}
但是好像没有解决问题?有什么建议吗?
使用上下文和sync.WaitGroup
为了避免 goroutine 泄漏,您可能希望确保一旦您从 mirroredQuery
中 return,最初在此函数中创建的 goroutine 不会保留 运行ning?
在那种情况下,最重要的是能够 取消 其他 goroutines 当其中一个 goroutines 成功完成请求时。这种取消是在 Go 中使用 context.Context
实现的,net/http
支持。
一旦你有上下文取消,你需要在你的 main 函数中有一个 sync.WaitGroup
来等待所有的 goroutines 成为 Done
.
这是一个 doRequest
,它使用上下文并包装了本书 request
函数的“HTTP get”功能:
func doRequest(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Fatal(err)
}
res, err := http.DefaultClient.Do(req)
// err will be non-nil also if the request was canceled
if err != nil {
return ""
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return ""
}
return string(b)
}
如果上下文被取消,http.DefaultClient.Do
将提前 return,并出现适当的错误。
现在,处理 goroutine 的函数变成了:
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
responses := make(chan string, 3)
fetcher := func(url string, wg *sync.WaitGroup) {
res := doRequest(ctx, url)
if res != "" {
responses <- res
}
wg.Done()
}
urls := []string{
"asia.gopl.io",
"europe.gopl.io",
"http://google.com",
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go fetcher(url, &wg)
}
res := <-responses
fmt.Println("got response", res[:300])
cancel()
wg.Wait()
return res
}
注意几点:
- 每个 goroutine 运行s
doRequest
并且只在结果非空时将结果写入responses
(意味着没有发生错误;取消在这里算作错误) WaitGroup
用于等待所有worker goroutines退出- 主 goroutine 启动所有 worker 然后等待
responses
中的第一个(非空)结果;然后它调用cancel
取消上下文,它发出所有 worker goroutines 退出的信号,并等待它们完成。
作为练习,扩展此代码以解决几个问题:
- 区分真正的错误和取消;在当前代码中,如果所有工作人员 运行 都陷入错误 ,则可能会出现死锁
- 使用
select
. 向主 goroutine 中读取的 - 尽快将代码写入 return 第一个结果给调用者,而后台 goroutine 可以处理取消上下文并等待 workers 退出。毕竟,这里的主要目标是 return 快速获得结果。
<- responses
添加超时
你看错书了。本书使用示例来说明如何使用缓冲通道来避免 goroutine 泄漏。
这是紧接着书中示例(第 233 页)的段落:
Had we used an unbuffered channel, the two slower goroutines would have gotten stuck trying to send their responses on a channel from which no goroutine will ever receive. This situation, called a goroutine leak, would be a bug. Unlike garbage variables, leaked goroutines are not automatically collected, so it is important to make sure that goroutines terminate themselves when no longer needed.
注:
- 此函数不会尝试优化内存占用或资源使用(包括网络资源)。 Go 的
net/http
包的客户端函数是上下文感知的,因此它可以在请求中间取消,这将节省一些资源(是否对问题很重要将是一个设计决定)。
要使用上下文,您可以:
func mirroredQuery() string {
responses := make(chan string, 3)
ctx, cf := context.WithCancel(context.Background())
defer cf()
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
panic(err)
}
res, err := http.DefaultClient.Do(req)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
- 使用缓冲通道分配内存。当 goroutine 太多时,使用缓冲通道太浪费了。
要解决这个问题,您可以使用频道(就像您尝试的那样):
func getAny() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
done := make(chan struct{})
defer close(done)
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-done:
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在关闭的频道上总是立即“returns”接收零值,因此用作广播。使用这种“完成通道”是常见的做法。
你也可以使用context.Context
:
func mirroredQuery() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
doRequest := func(url string) {
select {
case responses <- request(ctx, url):
fmt.Printf("get %s\n", url)
case <-ctx.Done():
fmt.Printf("stop %s\n", url)
return
}
}
go doRequest("http://google.com")
go doRequest("http://qq.com")
go doRequest("http://baidu.com")
return <-responses // return the quickest response
}
在这种情况下更好,因为您已经使用了带有 http 的 context.Context
。
- 使用
sync.WorkGroup
会等待所有请求完成,但 returns 第一个请求。我认为这违背了该功能的目的,并且几乎没有任何好处。而且我不认为在函数本身 returns 之前使所有 goroutines 产生函数 returns 是有意义的(除非该函数是主要函数)。
提供的代码中没有 goroutine 泄漏。 mirroredQuery
方法使用缓冲通道收集结果和 return 第一个答案。当前缓冲区有足够的 space 来收集所有 goroutines 的所有答案,即使从未读取其余响应。如果缓冲区小于 N - 1,情况就会改变,其中 N 是生成的 goroutine 的数量。在这种情况下,mirroredQuery
生成的一些 goroutine 将在尝试向 responses
通道发送响应时卡住。重复调用 mirroredQuery
会导致卡住的 goroutines 增加,这可以称为 goroutines leak。
这是添加了日志的代码以及两种情况的输出。
func mirroredQuery() string {
responses := make(chan string, 2)
go func() {
responses <- request("asia.gopl.io")
log.Printf("Finished goroutine asia.gopl.io\n")
}()
go func() {
responses <- request("europe.gopl.io")
log.Printf("Finished goroutine europe.gopl.io\n")
}()
go func() {
responses <- request("americas.gopl.io")
log.Printf("Finished goroutine americas.gopl.io\n")
}()
return <-responses // return the quickest response
}
func request(hostname string) (response string) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
time.Sleep(duration)
return hostname
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6*time.Second)
}
缓冲区大小的输出 >= N-1
2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io
Process finished with the exit code 0
缓冲区大小 < N-1 的输出
2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io
Process finished with the exit code 0
可以通过在第一个响应到达时引入 goroutines 终止来“改进”上述实现。这可能会减少使用的资源数量。这在很大程度上取决于 request
方法的作用。对于计算量大的场景这是有意义的,因为取消 http 请求可能会导致连接终止,因此下一个请求必须打开新的请求。对于高负载的服务器,即使不使用响应,它也可能不如等待响应有效。
下面是使用 context
用法的改进实现。
func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
responses := make(chan string)
f := func(hostname string) {
response, err := request(ctx, hostname)
if err != nil {
log.Printf("Finished %s with error %s\n", hostname, err)
return
}
responses <- response
log.Printf("Finished %s\n", hostname)
}
go f("asia.gopl.io")
go f("europe.gopl.io")
go f("americas.gopl.io")
return <-responses // return the quickest response
}
func request(ctx context.Context, hostname string) (string, error) {
duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
after := time.After(duration)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-after:
return "response for "+hostname, nil
}
}
func main() {
rand.Seed(time.Now().UnixNano())
result := mirroredQuery()
log.Printf("Fastest result for %s\n", result)
time.Sleep(6 * time.Second)
}