将上下文与取消一起使用,Go 例程不会终止

Using context with cancel, Go routine doesn't terminate

我是 Go 和 Go 并发的新手。一旦找到具有给定 ID 的成员,我将尝试使用 Go 上下文取消一组 Go 例程。

一个群组存储一个客户列表,每个客户都有一个成员列表。我想同时搜索所有客户及其所有成员,以找到具有给定 ID 的成员。一旦找到这个成员,我想取消所有其他 Go 例程和 return 发现的成员。

我尝试了以下实现,使用 context.WithCancel 和 WaitGroup。

但这不起作用,并且无限期挂起,永远不会超过 waitGroup.Wait() 行,但我不确定为什么。

func (group *Group) MemberWithID(ID string) (*models.Member, error) {
    found := make(chan *models.Member)
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    var waitGroup sync.WaitGroup

    for _, client := range group.Clients {
        waitGroup.Add(1)

        go func(clientToQuery Client) {
            defer waitGroup.Done()

            select {
            case <-ctx.Done():
                return
            default:
            }

            member, _ := client.ClientMemberWithID(ID)
            if member != nil {
                found <- member
                cancel()
                return
            }

        } (client)

    }

    waitGroup.Wait()

    if len(found) > 0 {
        return <-found, nil
    }

    return nil, fmt.Errorf("no member found with given id")
}

found 是一个无缓冲通道,因此在它上面发送会阻塞,直到有人准备好从它接收。

您的 main() 函数将是从它接收的函数,但仅在 waitGroup.Wait() return 秒之后。但这将阻塞,直到所有启动的 goroutines 调用 waitGroup.Done()。但这在他们 return 之前不会发生,而在他们可以发送 found 之前不会发生。这是一个僵局。

如果您将 found 更改为缓冲,即使 main() 尚未准备好从它接收值,也将允许在其上发送值(与缓冲区大小一样多的值)。

但是您应该在 waitGroup.Wait() return 秒之前从 found 收到。

另一种解决方案是为 found 使用 1 的缓冲区,并在 found 上使用非阻塞发送。这样第一个(最快的)goroutine 将能够发送结果,其余的(假设我们使用的是非阻塞发送)将简单地跳过发送。

另请注意,应该是 main() 调用 cancel(),而不是每个单独启动的 goroutines。

对于这种用例,我认为 sync.Once 可能比频道更合适。当你找到第一个非 nil 成员时,你想做两件不同的事情:

  1. 记录你找到的成员。
  2. 取消剩余的 goroutines。

缓冲通道可以轻松完成 (1),但会使 (2) 稍微复杂一些。但是 sync.Once 非常适合在第一次发生有趣的事情时做两件不同的事情!


我还建议汇总非平凡的错误,这样您就可以报告比 no member found 更有用的东西,例如,如果您的数据库连接失败或发生其他一些非平凡的错误。您也可以为此使用 sync.Once


综合起来,我希望看到这样的内容 (https://play.golang.org/p/QZXUUnbxOv5):

func (group *Group) MemberWithID(ctx context.Context, id string) (*Member, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var (
        wg sync.WaitGroup

        member    *Member
        foundOnce sync.Once

        firstNontrivialErr error
        errOnce            sync.Once
    )

    for _, client := range group.Clients {
        wg.Add(1)
        client := client // https://golang.org/doc/faq#closures_and_goroutines
        go func() {
            defer wg.Done()

            m, err := client.ClientMemberWithID(ctx, id)
            if m != nil {
                foundOnce.Do(func() {
                    member = m
                    cancel()
                })
            } else if nf := (*MemberNotFoundError)(nil); !errors.As(err, &nf) {
                errOnce.Do(func() {
                    firstNontrivialErr = err
                })
            }
        }()
    }
    wg.Wait()

    if member == nil {
        if firstNontrivialErr != nil {
            return nil, firstNontrivialErr
        }
        return nil, &MemberNotFoundError{ID: id}
    }
    return member, nil
}