惯用的 goroutine 终止和错误处理
Idiomatic goroutine termination and error handling
我在 go 中有一个简单的并发用例,我想不出一个优雅的解决方案来解决我的问题。
我想编写一个方法fetchAll
,从远程服务器并行查询数量不详的资源。如果任何提取失败,我想 return 立即解决第一个错误。
我的初始实现泄漏了 goroutines:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func fetchAll() error {
wg := sync.WaitGroup{}
errs := make(chan error)
leaks := make(map[int]struct{})
defer fmt.Println("these goroutines leaked:", leaks)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
leaks[i] = struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
defer delete(leaks, i)
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
errs <- fmt.Errorf("goroutine %d's error returned", i)
}(i)
}
// wait until all the fetches are done and close the error
// channel so the loop below terminates
go func() {
wg.Wait()
close(errs)
}()
// return the first error
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func main() {
fmt.Println(fetchAll())
}
游乐场:https://play.golang.org/p/Be93J514R5
我通过阅读 https://blog.golang.org/pipelines 知道我可以创建一个信号通道来清理其他线程。或者,我可能可以使用 context
来完成它。但似乎这样一个简单的用例应该有一个我所缺少的更简单的解决方案。
只要每个 goroutine 完成,你就不会泄漏任何东西。您应该创建缓冲的错误通道,缓冲区大小等于 goroutine 的数量,以便通道上的发送操作不会阻塞。每个 goroutine 应该总是在完成时在通道上发送一些东西,无论它是成功还是失败。然后底部的循环可以只迭代 goroutines 的数量和 return 如果它得到一个非零错误。您不需要关闭通道的 WaitGroup 或其他 goroutine。
我认为 goroutines 出现泄漏的原因是你 return 当你得到第一个错误时,所以其中一些仍然 运行.
顺便说一句,地图不是 goroutine 安全的。如果你在 goroutines 之间共享一个 map 并且其中一些 goroutines 正在对 map 进行更改,你需要使用互斥锁来保护它。
除了一个 goroutine 外,所有 goroutine 都已泄漏,因为它们仍在等待发送到 errs 通道 - 你永远不会完成清空它的 for-range。您还泄露了 goroutine 的工作是关闭错误通道,因为等待组永远不会完成。
(另外,正如 Andy 指出的那样,从映射中删除不是线程安全的,因此需要互斥锁的保护。)
但是,我认为映射、互斥锁、等待组、上下文等在这里甚至都不是必需的。我将重写整个内容以仅使用基本的通道操作,如下所示:
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchAll() error {
var N = 4
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
for i := 0; i < N; i++ {
go func(i int) {
// dummy fetch
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := error(nil)
if rand.Intn(2) == 0 {
err = fmt.Errorf("goroutine %d's error returned", i)
}
ch := done // we'll send to done if nil error and to errc otherwise
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}(i)
}
count := 0
for {
select {
case err := <-errc:
close(quit)
return err
case <-done:
count++
if count == N {
return nil // got all N signals, so there was no error
}
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println(fetchAll())
}
游乐场link:https://play.golang.org/p/mxGhSYYkOb
编辑:确实有一个愚蠢的错误,感谢您指出。我修复了上面的代码(我认为......)。我还添加了一些随机性以增加 Realism™。
另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只是一种方法。最终归结为个人品味,但总的来说,您想努力实现 "idiomatic" 代码 - 以及一种让您感觉自然且易于理解的风格。
使用 Error Group 使这更简单。这会自动等待所有提供的 Go Routines 成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的例程(在这种情况下,该错误是返回给调用者的一个气泡)。
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context) error {
errs, ctx := errgroup.WithContext(ctx)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
errs.Go(func() error {
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Errorf("error in go routine, bailing")
})
}
// Wait for completion and return the first error (if any)
return errs.Wait()
}
func main() {
fmt.Println(fetchAll(context.Background()))
}
这是一个使用 errgroup suggested by 的更完整的示例。显示处理成功的数据,第一次出错会退出。
https://play.golang.org/p/rU1v-Mp2ijo
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"time"
)
func fetchAll() error {
g, ctx := errgroup.WithContext(context.Background())
results := make(chan int)
for i := 0; i < 4; i++ {
current := i
g.Go(func() error {
// Simulate delay with random errors.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(2) == 0 {
return fmt.Errorf("goroutine %d's error returned", current)
}
// Pass processed data to channel, or receive a context completion.
select {
case results <- current:
return nil
// Close out if another error occurs.
case <-ctx.Done():
return ctx.Err()
}
})
}
// Elegant way to close out the channel when the first error occurs or
// when processing is successful.
go func() {
g.Wait()
close(results)
}()
for result := range results {
fmt.Println("processed", result)
}
// Wait for all fetches to complete.
return g.Wait()
}
func main() {
fmt.Println(fetchAll())
}
此答案包括将响应返回 doneData
-
的能力
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
)
var doneData []string // responses
func fetchAll(n int, doneCh chan bool, errCh chan error) {
partialDoneCh := make(chan string)
for i := 0; i < n; i++ {
go func(i int) {
if r := rand.Intn(100); r != 0 && r%10 == 0 {
// simulate an error
errCh <- fmt.Errorf("e33or for reqno=" + strconv.Itoa(r))
} else {
partialDoneCh <- strconv.Itoa(i)
}
}(i)
}
// mutation of doneData
for d := range partialDoneCh {
doneData = append(doneData, d)
if len(doneData) == n {
close(partialDoneCh)
doneCh <- true
}
}
}
func main() {
// rand.Seed(1)
var n int
var e error
if len(os.Args) > 1 {
if n, e = strconv.Atoi(os.Args[1]); e != nil {
panic(e)
}
} else {
n = 5
}
doneCh := make(chan bool)
errCh := make(chan error)
go fetchAll(n, doneCh, errCh)
fmt.Println("main: end")
select {
case <-doneCh:
fmt.Println("success:", doneData)
case e := <-errCh:
fmt.Println("failure:", e, doneData)
}
}
Execute using go run filename.go 50
where N=50 i.e amount of parallelism
我在 go 中有一个简单的并发用例,我想不出一个优雅的解决方案来解决我的问题。
我想编写一个方法fetchAll
,从远程服务器并行查询数量不详的资源。如果任何提取失败,我想 return 立即解决第一个错误。
我的初始实现泄漏了 goroutines:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func fetchAll() error {
wg := sync.WaitGroup{}
errs := make(chan error)
leaks := make(map[int]struct{})
defer fmt.Println("these goroutines leaked:", leaks)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
leaks[i] = struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
defer delete(leaks, i)
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
errs <- fmt.Errorf("goroutine %d's error returned", i)
}(i)
}
// wait until all the fetches are done and close the error
// channel so the loop below terminates
go func() {
wg.Wait()
close(errs)
}()
// return the first error
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func main() {
fmt.Println(fetchAll())
}
游乐场:https://play.golang.org/p/Be93J514R5
我通过阅读 https://blog.golang.org/pipelines 知道我可以创建一个信号通道来清理其他线程。或者,我可能可以使用 context
来完成它。但似乎这样一个简单的用例应该有一个我所缺少的更简单的解决方案。
只要每个 goroutine 完成,你就不会泄漏任何东西。您应该创建缓冲的错误通道,缓冲区大小等于 goroutine 的数量,以便通道上的发送操作不会阻塞。每个 goroutine 应该总是在完成时在通道上发送一些东西,无论它是成功还是失败。然后底部的循环可以只迭代 goroutines 的数量和 return 如果它得到一个非零错误。您不需要关闭通道的 WaitGroup 或其他 goroutine。
我认为 goroutines 出现泄漏的原因是你 return 当你得到第一个错误时,所以其中一些仍然 运行.
顺便说一句,地图不是 goroutine 安全的。如果你在 goroutines 之间共享一个 map 并且其中一些 goroutines 正在对 map 进行更改,你需要使用互斥锁来保护它。
除了一个 goroutine 外,所有 goroutine 都已泄漏,因为它们仍在等待发送到 errs 通道 - 你永远不会完成清空它的 for-range。您还泄露了 goroutine 的工作是关闭错误通道,因为等待组永远不会完成。
(另外,正如 Andy 指出的那样,从映射中删除不是线程安全的,因此需要互斥锁的保护。)
但是,我认为映射、互斥锁、等待组、上下文等在这里甚至都不是必需的。我将重写整个内容以仅使用基本的通道操作,如下所示:
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchAll() error {
var N = 4
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
for i := 0; i < N; i++ {
go func(i int) {
// dummy fetch
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := error(nil)
if rand.Intn(2) == 0 {
err = fmt.Errorf("goroutine %d's error returned", i)
}
ch := done // we'll send to done if nil error and to errc otherwise
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}(i)
}
count := 0
for {
select {
case err := <-errc:
close(quit)
return err
case <-done:
count++
if count == N {
return nil // got all N signals, so there was no error
}
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println(fetchAll())
}
游乐场link:https://play.golang.org/p/mxGhSYYkOb
编辑:确实有一个愚蠢的错误,感谢您指出。我修复了上面的代码(我认为......)。我还添加了一些随机性以增加 Realism™。
另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只是一种方法。最终归结为个人品味,但总的来说,您想努力实现 "idiomatic" 代码 - 以及一种让您感觉自然且易于理解的风格。
使用 Error Group 使这更简单。这会自动等待所有提供的 Go Routines 成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的例程(在这种情况下,该错误是返回给调用者的一个气泡)。
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context) error {
errs, ctx := errgroup.WithContext(ctx)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
errs.Go(func() error {
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Errorf("error in go routine, bailing")
})
}
// Wait for completion and return the first error (if any)
return errs.Wait()
}
func main() {
fmt.Println(fetchAll(context.Background()))
}
这是一个使用 errgroup suggested by
https://play.golang.org/p/rU1v-Mp2ijo
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"time"
)
func fetchAll() error {
g, ctx := errgroup.WithContext(context.Background())
results := make(chan int)
for i := 0; i < 4; i++ {
current := i
g.Go(func() error {
// Simulate delay with random errors.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(2) == 0 {
return fmt.Errorf("goroutine %d's error returned", current)
}
// Pass processed data to channel, or receive a context completion.
select {
case results <- current:
return nil
// Close out if another error occurs.
case <-ctx.Done():
return ctx.Err()
}
})
}
// Elegant way to close out the channel when the first error occurs or
// when processing is successful.
go func() {
g.Wait()
close(results)
}()
for result := range results {
fmt.Println("processed", result)
}
// Wait for all fetches to complete.
return g.Wait()
}
func main() {
fmt.Println(fetchAll())
}
此答案包括将响应返回 doneData
-
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
)
var doneData []string // responses
func fetchAll(n int, doneCh chan bool, errCh chan error) {
partialDoneCh := make(chan string)
for i := 0; i < n; i++ {
go func(i int) {
if r := rand.Intn(100); r != 0 && r%10 == 0 {
// simulate an error
errCh <- fmt.Errorf("e33or for reqno=" + strconv.Itoa(r))
} else {
partialDoneCh <- strconv.Itoa(i)
}
}(i)
}
// mutation of doneData
for d := range partialDoneCh {
doneData = append(doneData, d)
if len(doneData) == n {
close(partialDoneCh)
doneCh <- true
}
}
}
func main() {
// rand.Seed(1)
var n int
var e error
if len(os.Args) > 1 {
if n, e = strconv.Atoi(os.Args[1]); e != nil {
panic(e)
}
} else {
n = 5
}
doneCh := make(chan bool)
errCh := make(chan error)
go fetchAll(n, doneCh, errCh)
fmt.Println("main: end")
select {
case <-doneCh:
fmt.Println("success:", doneData)
case e := <-errCh:
fmt.Println("failure:", e, doneData)
}
}
Execute using
go run filename.go 50
where N=50 i.e amount of parallelism