golang 使用 waitGroup 仍然以死锁错误告终

golang used waitGroup and still ended up with a deadlock error

我正在尝试根据 Go Concurrency 一书实现桥接模式

func bridge_impl() {
    done := make(chan interface{})
    defer close(done)
    var wg sync.WaitGroup
    bridge := func(
        done <-chan interface{},
        chanStream <-chan <-chan interface{},
    ) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            wg.Add(1)
            defer close(valStream)
            for {
                var stream <-chan interface{}
                select {
                case maybeStream, ok := <-chanStream:
                    fmt.Println("works")
                    if ok == false {
                        return
                    }
                    stream = maybeStream
                case <-done:
                    return
                }
                for val := range stream {
                    select {
                    case valStream <- val:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
    genVals := func() <-chan <-chan interface{} {
        chanStream := make(chan (<-chan interface{}))
        go func() {
            wg.Add(1)
            defer close(chanStream)
            for i := 0; i < 10; i++ {
                stream := make(chan interface{})
                stream <- i
                close(stream)
                chanStream <- stream
            }
        }()
        return chanStream
    }
    for v := range bridge(done, genVals()) {
        fmt.Printf("%v ", v)
    }
    wg.Wait()
}

但是我收到了一个死锁错误all goroutines are asleep - deadlock!起初我认为我应该添加一个等待组,即使它没有在书中的示例中实现但我最终遇到了同样的错误

主要有两个问题。
Working example

第一期:

for i := 0; i < 10; i++ {
    stream := make(chan interface{})
    stream <- i
    close(stream)
    chanStream <- stream
}

创建后写入无缓冲通道,没有 goroutine 读取。使用缓冲通道或其他 goroutine。

stream := make(chan interface{}, 1) // buffer size 1 to not block `stream <- i`

第二期:
在不使用 wg.Done().
的情况下使用 wg.Add(1) 您可以在这两种情况下使用 defer

wg.Add(1)
defer wg.Done()

据我了解,您根本不需要 WaitGroup,您只需要 re-order genVals 函数循环中的语句:

for i := 0; i < 10; i++ {
    stream := make(chan interface{})
    chanStream <- stream
    stream <- i
    close(stream)
}

https://go.dev/play/p/7D9OzrsvZyi