关闭完成通道后缺少 goroutine 的打印

Missing printings from goroutine after closing done channel

我是 运行 以下代码基于 "Concurrency in Go" 一书中的示例,当时我注意到并非管道中的所有关闭打印都被打印。
看到 "done multiply!" 不见了。
另一方面,NumGoroutine() 仅显示主要功能是 运行.
以下代码有什么问题? (https://play.golang.org/p/tkFgvKboVgS)

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    generator := func(done <-chan struct{}) <-chan int {
        intStream := make(chan int)
        i:=0
        go func() {
            defer close(intStream)
            for {
                select {
                case <-done:
                    fmt.Println("done generator!")
                    return
                case intStream <- i:
                    time.Sleep(1 * time.Second)
                    i++
                }
                fmt.Println("generator after select")
            }
        }()
        return intStream
    }

    multiply := func(
        done <-chan struct{},
        intStream <-chan int,
        multiplier int,
    ) <-chan int {
        multipliedStream := make(chan int)
        go func() {
            defer close(multipliedStream)
            for i := range intStream {
                select {
                case <-done:
                    fmt.Println("done multiply !")
                    return
                case multipliedStream <- i * multiplier:
                }
                fmt.Println("multiply after select")
            }
        }()
        return multipliedStream
    }
    add := func(
        done <-chan struct{},
        intStream <-chan int,
        additive int,
    ) <-chan int {
        addedStream := make(chan int)
        go func() {
            defer close(addedStream)
            for i := range intStream {
                select {
                case <-done:
                    fmt.Println("done add !")
                    return
                case addedStream <- i + additive:
                }
                fmt.Println("add after select")
            }
        }()
        return addedStream
    }

    done := make(chan struct{})

    intStream := generator(done)
    pipeline := add(done, multiply(done, intStream, 2), 2)
    go func() {
        time.Sleep(3 * time.Second)
        close(done)
        fmt.Println("Closed done")
    }()
    for v := range pipeline {
        fmt.Println(v)
    }
    fmt.Println("finished iterating pipeline")
    time.Sleep(10 * time.Second)
    fmt.Println("ramaining goroutines:", runtime.NumGoroutine())
    fmt.Println("finished!")
}

输出:

add after select
2
multiply after select
generator after select
multiply after select
add after select
4
generator after select
multiply after select
add after select
6
generator after select
Closed done
multiply after select
done add !
finished iterating pipeline
generator after select
done generator!
ramaining goroutines: 1
finished!

有些代码路径不会打印某些 done 消息。调度程序碰巧选择了一个不打印 multiply 的那个。如果稍微更改代码(例如,在与现在不同的实例上登录),您会发现它也可能会错过 add done 消息。 (https://play.golang.org/p/meEPM5GR9Rr)。原因如下:

如果 done 消息在生成器将数字写入通道并且乘法器读取它之后立即到达,则乘法器看到 done 可用并选择它。当 multiplier 打印 done 消息时就是这种情况。如果 done 消息在 multiplier 在 for 循环中等待时到达,则 multiplier 将在输入通道(而不是 done 通道)上收到关闭消息,导致 for 循环终止而不打印 done留言。

出现问题是因为您在 for 循环中从频道读取数据,然后 selecting。在等待 for 循环从通道读取时,评估与 select 相关的事件的 none。

处理此问题的更好方法是不使用 for 循环从通道读取。例如:

for {
     select {
        case <-done:
           return
        case i, ok:= <-intstream:
           if !ok {
              return
           }
           select {
               case <- done:
                    return
               case addedStream <- i + additive:
           }
     }
}

您的 addmultiply 例程不是永远的循环,而是 for ... range 循环。因此,在每个循环的顶部,他们等待下一个整数,而不是在 select 中等待从 done 接收关闭或将结果发送到他们的流。这不是问题,但它意味着如果他们的输入流是关闭,他们将return不进入循环本身。

如果我在输入时已经 add fmt.Println calls to expose the point at which they exit due to reaching the end of their input stream, the behavior changes slightly (probably due to timing; I haven't bothered to reason much about it and )并且输出变为:

add after select
2
multiply after select
generator after select
multiply after select
add after select
4
generator after select
multiply after select
add after select
6
generator after select
Closed done
done multiply !
add got end of stream - done!
finished iterating pipeline
generator after select
done generator!
ramaining goroutines: 1
finished!

使生成器本身接收done信号通常更合理,并使流水线函数始终写入所有结果,这使它们更可预测.当然,无论谁正在读取每个管道,都必须读取到最后——但你已经在主 goroutine 中这样做了,所以我们只是将它传播到整个过程。 Here 是您的代码的简化版本,以这种方式执行;它输出:

2
generator after select
4
generator after select
6
generator after select
Closed done
8
generator after select
done generator!
multiply got end of stream - done!
add got end of stream - done!
finished iterating pipeline
remaining goroutines: 1

请注意,这一次,我们从最终生成的值 (3) 中得到最终的计算值 (8)。