并发处理程序正在阻塞

Concurrent handler is blocking

我们发现一个 mqtt.MessageHandler 无法正常工作。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个函数进行处理。 func实现如下:

func processEvent(i models.Foo) (string, error) {
    var wg sync.WaitGroup
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)

    err := func1()
    if err != nil {
        return err
    }

    switch strings.ToUpper(i.Status) {
    case "OK":
        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask1()
            ch := done
            if err != nil {
                log.Error("%s", err.Error())
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask2()
            ch := done
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        result := "processed"
        count := 0
        for {
            select {
            case err := <-errc:
                close(quit)
                log.Info("event: %s, %s", "", err.Error())
                return "", err
            case <-done:
                count++
                if count == 4 { // why 4???
                    return result, nil
                }
            }
        }

        wg.Wait()

        if err != nil {
            log.Info("event: %s, %s", result, err.Error())
            return result, err
        }
        close(quit)
        close(errc)
        close(done)
        return result, nil
    default:
        return "", nil
    }

    return "", nil
}

我明白了,它正在尝试同步 longTimeTask1() 和 longTimeTask2()。但对我来说理解起来很复杂。 count 和 count == 4 的目的是什么?为什么最后收盘?代码提示无法访问 wg.Wait()。 在此功能运行良好之前。但最近 longTimeTask1()longTimeTask2() 可能会 return 一些错误,这会破坏代码,这个函数似乎被完全阻止了。你能帮我理解代码并找到潜在的问题并重构这部分吗?

查看 count,代码似乎希望从 done 通道接收四条消息。然而,这段代码最多可以从两个 goroutines 中产生两条这样的消息,所以这是一个错误。

此外,如果任何 goroutines returns 出错,它不会写入 done 通道,所以这是另一个错误。

另一种写法可能是:

...
result := "processed"
for {
    select {
       case err := <-errc:
          close(quit) // Tell the goroutines to terminate
          log.Info("event: %s, %s", "", err.Error())
          wg.Wait() // Wait for them to finish
          return "", err
  
       case <-done:
          count++
          if count == 2 {
              wg.Wait()
              return result, nil
          }    
}

这正是 errgroup 包设计用于的那种分叉和连接并发:

func processEvent(ctx context.Context, i models.Foo) (string, error) {
    err := func1()
    if err != nil {
        return "", err
    }

    g, ctx := errgroup.WithContext(ctx)

    if strings.ToUpper(i.Status) != "OK" {
        return "", nil
    }

    g.Go(func() error { return longTimeTask1(ctx) })
    g.Go(func() error { return longTimeTask2(ctx) })

    if err := g.Wait(); err != nil {
        log.Printf("event: %v", err)
        return "", err
    }
    return "processed", nil
}

(https://play.golang.org/p/JNMKftQTLGs)