select vs 在不同通道上接收多个并发协程:在逻辑或性能上有区别吗?
select vs multiple concurrent coroutines receiving on different channels : Is there a difference in logic or in performance?
Select 与在不同通道上接收的多个并发协程:逻辑或性能上有区别吗?
我的问题更笼统地说是关于 Go 中“扇入”方案的实现。在我看来,使用“select”的方案在任意大量通道(大量通道)的情况下不起作用。
参见下面示例中的 receive() 和 receive2()。
receive2()函数是不是太复杂了?矫枉过正?
为什么 select 公式被认为更地道?
package main
import (
"fmt"
"time"
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
send(ch1, ch2)
//receive(ch1, ch2)
receive2(ch1, ch2)
time.Sleep(3 * time.Second)
}
func send(ch1 chan int, ch2 chan int) {
go func() {
for i := 0; i < 10; i++ {
ch1 <- i
}
close(ch1)
}()
go func() {
for i := 0; i < 10; i++ {
ch2 <- i
}
close(ch2)
}()
}
func receive(ch1 chan int, ch2 chan int) {
go func() {
for item := range ch1 {
fmt.Printf("1: %d\n", item)
}
}()
go func() {
for item := range ch2 {
fmt.Printf("2: %d\n", item)
}
}()
}
func receive2(ch1 chan int, ch2 chan int) {
for {
select {
case x, ok := <-ch1:
fmt.Println("ch1", x, ok)
if !ok {
ch1 = nil
}
case x, ok := <-ch2:
fmt.Println("ch2", x, ok)
if !ok {
ch2 = nil
}
}
if ch1 == nil && ch2 == nil {
break
}
}
}
您的实施不是扇入。扇入将多个通道的结果收集到一个通道中。像这样,您可以在单个 out
通道上接收所有值。
func fanIn(in ...chan int) chan int {
out := make(chan int)
for _, c := range in {
go func(i chan int) {
for v := range i {
out <- v
}
}(c)
}
return out
}
在您的实现中,第一个 receive()
只是从 N 个通道并发接收,彼此独立。它可以被重写以接受可变参数并且可以 看起来 更像一个实际的扇入:
func receive(cs ...chan int) {
for i, cN := range cs {
go func(i int, c chan int) {
for item := range c {
fmt.Printf("%d: %d\n", i, item)
}
}(i, cN)
}
}
receive2()
本质上是顺序的,不会缩放,因为您必须为每个通道编写一个 case
,并且 &&
它们一起写才能知道何时打破循环。 &&
可以用 sync.WaitGroup
重写,但每次迭代你仍然只处理一个项目(随机,在准备好接收的案例中)。
Select 与在不同通道上接收的多个并发协程:逻辑或性能上有区别吗?
我的问题更笼统地说是关于 Go 中“扇入”方案的实现。在我看来,使用“select”的方案在任意大量通道(大量通道)的情况下不起作用。
参见下面示例中的 receive() 和 receive2()。
receive2()函数是不是太复杂了?矫枉过正?
为什么 select 公式被认为更地道?
package main
import (
"fmt"
"time"
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
send(ch1, ch2)
//receive(ch1, ch2)
receive2(ch1, ch2)
time.Sleep(3 * time.Second)
}
func send(ch1 chan int, ch2 chan int) {
go func() {
for i := 0; i < 10; i++ {
ch1 <- i
}
close(ch1)
}()
go func() {
for i := 0; i < 10; i++ {
ch2 <- i
}
close(ch2)
}()
}
func receive(ch1 chan int, ch2 chan int) {
go func() {
for item := range ch1 {
fmt.Printf("1: %d\n", item)
}
}()
go func() {
for item := range ch2 {
fmt.Printf("2: %d\n", item)
}
}()
}
func receive2(ch1 chan int, ch2 chan int) {
for {
select {
case x, ok := <-ch1:
fmt.Println("ch1", x, ok)
if !ok {
ch1 = nil
}
case x, ok := <-ch2:
fmt.Println("ch2", x, ok)
if !ok {
ch2 = nil
}
}
if ch1 == nil && ch2 == nil {
break
}
}
}
您的实施不是扇入。扇入将多个通道的结果收集到一个通道中。像这样,您可以在单个 out
通道上接收所有值。
func fanIn(in ...chan int) chan int {
out := make(chan int)
for _, c := range in {
go func(i chan int) {
for v := range i {
out <- v
}
}(c)
}
return out
}
在您的实现中,第一个 receive()
只是从 N 个通道并发接收,彼此独立。它可以被重写以接受可变参数并且可以 看起来 更像一个实际的扇入:
func receive(cs ...chan int) {
for i, cN := range cs {
go func(i int, c chan int) {
for item := range c {
fmt.Printf("%d: %d\n", i, item)
}
}(i, cN)
}
}
receive2()
本质上是顺序的,不会缩放,因为您必须为每个通道编写一个 case
,并且 &&
它们一起写才能知道何时打破循环。 &&
可以用 sync.WaitGroup
重写,但每次迭代你仍然只处理一个项目(随机,在准备好接收的案例中)。