并发处理程序正在阻塞
Concurrent handler is blocking
我们发现一个 mqtt.MessageHandler
无法正常工作。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个函数进行处理。 func实现如下:
func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case "OK":
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error("%s", err.Error())
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
result := "processed"
count := 0
for {
select {
case err := <-errc:
close(quit)
log.Info("event: %s, %s", "", err.Error())
return "", err
case <-done:
count++
if count == 4 { // why 4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info("event: %s, %s", result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return "", nil
}
return "", nil
}
我明白了,它正在尝试同步 longTimeTask1()
和 longTimeTask2()。但对我来说理解起来很复杂。 count 和 count == 4 的目的是什么?为什么最后收盘?代码提示无法访问 wg.Wait()
。
在此功能运行良好之前。但最近 longTimeTask1()
或 longTimeTask2()
可能会 return 一些错误,这会破坏代码,这个函数似乎被完全阻止了。你能帮我理解代码并找到潜在的问题并重构这部分吗?
查看 count
,代码似乎希望从 done
通道接收四条消息。然而,这段代码最多可以从两个 goroutines 中产生两条这样的消息,所以这是一个错误。
此外,如果任何 goroutines returns 出错,它不会写入 done
通道,所以这是另一个错误。
另一种写法可能是:
...
result := "processed"
for {
select {
case err := <-errc:
close(quit) // Tell the goroutines to terminate
log.Info("event: %s, %s", "", err.Error())
wg.Wait() // Wait for them to finish
return "", err
case <-done:
count++
if count == 2 {
wg.Wait()
return result, nil
}
}
这正是 errgroup
包设计用于的那种分叉和连接并发:
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}
我们发现一个 mqtt.MessageHandler
无法正常工作。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个函数进行处理。 func实现如下:
func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case "OK":
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error("%s", err.Error())
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
result := "processed"
count := 0
for {
select {
case err := <-errc:
close(quit)
log.Info("event: %s, %s", "", err.Error())
return "", err
case <-done:
count++
if count == 4 { // why 4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info("event: %s, %s", result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return "", nil
}
return "", nil
}
我明白了,它正在尝试同步 longTimeTask1()
和 longTimeTask2()。但对我来说理解起来很复杂。 count 和 count == 4 的目的是什么?为什么最后收盘?代码提示无法访问 wg.Wait()
。
在此功能运行良好之前。但最近 longTimeTask1()
或 longTimeTask2()
可能会 return 一些错误,这会破坏代码,这个函数似乎被完全阻止了。你能帮我理解代码并找到潜在的问题并重构这部分吗?
查看 count
,代码似乎希望从 done
通道接收四条消息。然而,这段代码最多可以从两个 goroutines 中产生两条这样的消息,所以这是一个错误。
此外,如果任何 goroutines returns 出错,它不会写入 done
通道,所以这是另一个错误。
另一种写法可能是:
...
result := "processed"
for {
select {
case err := <-errc:
close(quit) // Tell the goroutines to terminate
log.Info("event: %s, %s", "", err.Error())
wg.Wait() // Wait for them to finish
return "", err
case <-done:
count++
if count == 2 {
wg.Wait()
return result, nil
}
}
这正是 errgroup
包设计用于的那种分叉和连接并发:
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}