select vs 在不同通道上接收多个并发协程:在逻辑或性能上有区别吗?

select vs multiple concurrent coroutines receiving on different channels : Is there a difference in logic or in performance?

Select 与在不同通道上接收的多个并发协程:逻辑或性能上有区别吗?

我的问题更笼统地说是关于 Go 中“扇入”方案的实现。在我看来,使用“select”的方案在任意大量通道(大量通道)的情况下不起作用。

参见下面示例中的 receive() 和 receive2()。

receive2()函数是不是太复杂了?矫枉过正?

为什么 select 公式被认为更地道?

package main

import (
    "fmt"
    "time"
)

func main() {

    var ch1 = make(chan int)
    var ch2 = make(chan int)

    send(ch1, ch2)
    //receive(ch1, ch2)
    receive2(ch1, ch2)

    time.Sleep(3 * time.Second)

}

func send(ch1 chan int, ch2 chan int) {
    go func() {
        for i := 0; i < 10; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    go func() {
        for i := 0; i < 10; i++ {
            ch2 <- i
        }
        close(ch2)
    }()
}

func receive(ch1 chan int, ch2 chan int) {
    go func() {
        for item := range ch1 {
            fmt.Printf("1: %d\n", item)
        }
    }()

    go func() {
        for item := range ch2 {
            fmt.Printf("2: %d\n", item)
        }
    }()
}

func receive2(ch1 chan int, ch2 chan int) {
    for {
        select {
        case x, ok := <-ch1:
            fmt.Println("ch1", x, ok)
            if !ok {
                ch1 = nil
            }
        case x, ok := <-ch2:
            fmt.Println("ch2", x, ok)
            if !ok {
                ch2 = nil
            }
        }

        if ch1 == nil && ch2 == nil {
            break
        }
    }
}

您的实施不是扇入。扇入将多个通道的结果收集到一个通道中。像这样,您可以在单个 out 通道上接收所有值。

func fanIn(in ...chan int) chan int {
    out := make(chan int)
    for _, c := range in {
        go func(i chan int) {
            for v := range i {
                out <- v
            }

        }(c)
    }
    return out
}

在您的实现中,第一个 receive() 只是从 N 个通道并发接收,彼此独立。它可以被重写以接受可变参数并且可以 看起来 更像一个实际的扇入:

func receive(cs ...chan int) {
    for i, cN := range cs {
        go func(i int, c chan int) {
            for item := range c {
                fmt.Printf("%d: %d\n", i, item)
            }
        }(i, cN)
    }
}

receive2() 本质上是顺序的,不会缩放,因为您必须为每个通道编写一个 case,并且 && 它们一起写才能知道何时打破循环。 && 可以用 sync.WaitGroup 重写,但每次迭代你仍然只处理一个项目(随机,在准备好接收的案例中)。