当涉及多个渠道时,select 是如何工作的?

How does select work when multiple channels are involved?

我在

等多个非缓冲通道上使用 select 时发现
select {
case <- chana:
case <- chanb:
}

即使两个通道都有数据,但是在处理这个select时, case chana 和 case chanb 的调用不平衡。

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "time"
)

func main() {
    chana := make(chan int)
    chanb := make(chan int)

    go func() {
        for i := 0; i < 1000; i++ {
            chana <- 100 * i
        }
    }()

    go func() {
        for i := 0; i < 1000; i++ {
            chanb <- i
        }
    }()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for {
            select {
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            }
            if acount == 1000 || bcount == 1000 {
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            }
        }
        wg.Done()
    }()

    wg.Wait()
}

运行这个demo,当其中一个chana,chanb结束时read/write,另一个可能还剩下999-1

有什么方法可以保证余额吗?

找到相关主题
golang-channels-select-statement

如评论中所述,如果要确保平衡,可以完全放弃在读取 goroutine 中使用 select 并依赖无缓冲通道提供的同步:

go func() {
    for {
        <-chana
        acount++
        <-chanb
        bcount++

        if acount == 1000 || bcount == 1000 {
            fmt.Println("finish one acount, bcount", acount, bcount)
            break
        }
    }
    wg.Done()
}()

已编辑:您也可以从供应方面进行平衡,但@icza 的回答对我来说似乎是一个更好的选择,并且还解释了在第一名。令人惊讶的是,即使在我的(虚拟)机器上它也是一边倒的。

这里有一些东西可以平衡供应方的两个例程(不知何故似乎在 Playground 上不起作用)。

package main

import (
    "fmt"
    _ "net/http/pprof"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    chana := make(chan int)
    chanb := make(chan int)
    var balanceSwitch int32

    go func() {
        for i := 0; i < 1000; i++ {
            for atomic.LoadInt32(&balanceSwitch) != 0 {
                fmt.Println("Holding R1")
                time.Sleep(time.Nanosecond * 1)
            }
            chana <- 100 * i
            fmt.Println("R1: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 1)

        }
    }()

    go func() {
        for i := 0; i < 1000; i++ {

            for atomic.LoadInt32(&balanceSwitch) != 1 {
                fmt.Println("Holding R2")
                time.Sleep(time.Nanosecond * 1)
            }
            chanb <- i
            fmt.Println("R2: Sent i", i)
            atomic.StoreInt32(&balanceSwitch, 0)

        }
    }()

    time.Sleep(time.Microsecond * 300)

    acount := 0
    bcount := 0
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for {
            select {
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            }
            fmt.Println("Acount Bcount", acount, bcount)
            if acount == 1000 || bcount == 1000 {
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            }
        }
        wg.Done()
    }()

    wg.Wait()
}

通过更改atomic.LoadInt32(&balanceSwitch) != XXatomic.StoreInt32(&balanceSwitch, X),或其他机制,您可以将其映射到任意数量的例程。这可能不是最好的做法,但如果这是一项要求,那么您可能必须考虑这样的选择。希望这有帮助。

Go select 语句不偏向于任何(就绪)情况。引用规范:

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

如果可以进行多个通信,则随机选择一个。这不是一个完美的随机分布,规范也不能保证这一点,但它是随机的。

你遇到的是 Go Playground 有 GOMAXPROCS=1 (which you can verify here) 并且 goroutine 调度器没有被抢占的结果。这意味着默认情况下 goroutines 不是并行执行的。如果遇到阻塞操作(例如从网络读取,或尝试从阻塞的通道接收或发送),一个 goroutine 将被置于公园,另一个准备 运行 继续。

并且由于您的代码中没有阻塞操作,goroutines 可能不会被放入 park 并且可能只有一个 "producer" goroutines 会 运行,而另一个可能不会得到预定(永远)。

运行 你的代码在我本地计算机上 GOMAXPROCS=4,我有非常 "realistic" 结果。 运行它几次,输出:

finish one acount, bcount 1000 901
finish one acount, bcount 1000 335
finish one acount, bcount 1000 872
finish one acount, bcount 427 1000

如果您需要确定单个案例的优先级,请查看此答案:

select 的默认行为不保证相等的优先级,但平均而言它会接近它。如果你需要保证相同的优先级,那么你不应该使用 select,但是你可以从 2 个通道做一系列的 2 个非阻塞接收,它看起来像这样:

for {
    select {
    case <-chana:
        acount++
    default:
    }
    select {
    case <-chanb:
        bcount++
    default:
    }
    if acount == 1000 || bcount == 1000 {
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    }
}

如果两个都提供值,上述 2 个非阻塞接收将以相同的速度(具有相同的优先级)耗尽 2 个通道,如果一个不提供,则不断从另一个接收而不会延迟或阻塞。

关于这一点需要注意的一件事是,如果 none 的通道提供任何要接收的值,这基本上是一个 "busy" 循环并且因此消耗计算能力。为避免这种情况,我们可能会检测到 none 个通道已准备就绪,然后 然后 对两个接收使用 select 语句,然后将阻塞直到其中之一已准备好接收,不会浪费任何 CPU 资源:

for {
    received := 0
    select {
    case <-chana:
        acount++
        received++
    default:
    }
    select {
    case <-chanb:
        bcount++
        received++
    default:
    }

    if received == 0 {
        select {
        case <-chana:
            acount++
        case <-chanb:
            bcount++
        }
    }

    if acount == 1000 || bcount == 1000 {
        fmt.Println("finish one acount, bcount", acount, bcount)
        break
    }
}

关于goroutine调度的更多细节,请看这些问题:

Goroutines 8kb and windows OS thread 1 mb

Why does it not create many threads when many goroutines are blocked in writing file in golang?