多个 goroutines 在一个通道上有选择地监听

Multiple goroutines listening selectively on one channel

我看过 this, , and 但 none 在这种情况下确实对我有帮助。 如果通道中的值是针对特定 goroutine 的,我有多个 goroutine 需要执行某些任务。

var uuidChan chan string

func handleEntity(entityUuid string) {
    go func() {
        for {
            select {
            case uuid := <-uuidChan:
                if uuid == entityUuid {
                    // logic
                    println(uuid)
                    return
                }
            case <-time.After(time.Second * 5):
                println("Timeout")
                return
            }
        }
    }()
}

func main() {
    uuidChan = make(chan (string))
    for i := 0; i < 5; i++ {
        handleEntity(fmt.Sprintf("%d", i))
    }
    for i := 0; i < 4; i++ {
        uuidChan <- fmt.Sprintf("%d", i)
    }
}

https://play.golang.org/p/Pu5MhSP9Qtj

在上述逻辑中,uuid 被其中一个通道接收到,但没有任何反应。为了解决这个问题,如果某些 uuid 的逻辑不在该例程中,我尝试更改逻辑以将 uuid 重新插入回通道。我知道这是一种不好的做法,而且也行不通。

func handleEntity(entityUuid string) {
    go func() {
        var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
        for {
            select {
            case uuid := <-uuidChan:
                if uuid == entityUuid {
                    // logic
                    println(uuid)
                    return
                } else {
                    notMe = append(notMe, uuid)
                }
            case <-time.After(time.Second * 5):
                println("Timeout")
                defer func() {
                    for _, uuid := range notMe {
                        uuidChan <- uuid
                    }
                }()
                return
            }
        }
    }()
}

https://play.golang.org/p/5On-Vd7UzqP

执行此操作的正确方法是什么?

也许您想映射您的频道以立即将消息发送到正确的 goroutine:

package main

import (
    "fmt"
    "time"
)

func worker(u string, c chan string) {
    for {
        fmt.Printf("got %s in %s\n", <-c, u)
    }
}

func main() {
    workers := make(map[string]chan string)

    for _, u := range []string{"foo", "bar", "baz"} {
        workers[u] = make(chan string)
        go worker(u, workers[u])
    }

    workers["foo"] <- "hello"
    workers["bar"] <- "world"
    workers["baz"] <- "!"

    fmt.Println()

    time.Sleep(time.Second)
}

您有一个 盒子,里面有一个标签,所以接收者应该先阅读标签,然后决定如何处理它.如果您将标签放在盒子里 - 您是在强迫接收者打开盒子(参见解决方案编号 1)。我鼓励您提供更好的邮政服务,并至少将标签放在盒子外面(参见解决方案编号 3)- 或者最好立即将盒子送到正确的地址(参见解决方案编号 2):

解决这个问题的方法有很多种,只是你的想象力有限:
1. 由于对于具有 ID 的消费者,您只有一个通道,其中包含一个带有 ID 的数据,并且您只能从该通道读取一次数据(假设通道内数据的顺序很重要) - 您有一个简单的解决方案:使用读取 goroutine 从通道读取数据,然后应用逻辑来决定如何处理这些数据 - 例如将它发送到另一个 goroutine 或 运行 一个任务。
试试 this:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    uuidChan := make(chan string)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        t := time.NewTimer(5 * time.Second)
        defer t.Stop()
        for {
            select {
            case uuid, ok := <-uuidChan:
                if !ok {
                    fmt.Println("Channel closed.")
                    return
                }
// logic:
                wg.Add(1)
                // Multiple goroutines listening selectively on one channel
                go consume(uuid, &wg)
                // switch uuid {case 1: go func1(); case 2: go func2()}

            case <-t.C:
                fmt.Println("Timeout")
                return
            }
        }
    }()

    for i := 0; i < 4; i++ {
        uuidChan <- fmt.Sprintf("%d", i)
    }
    close(uuidChan) // free up the goroutine

    wg.Wait() // wait until all consumers are done
    fmt.Println("All done.")
}

// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
    defer wg.Done()
// logic: or decide here based on uuid
    fmt.Println("job #:", uuid) // job
}

输出:

job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.

  1. 每个 goroutine 使用一个通道,尝试 this:
package main

import (
    "fmt"
    "sync"
    "time"
)

func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    // for {
    select {
    case uuid, ok := <-uuidChan:
        if !ok {
            fmt.Println("closed")
            return // free up goroutine on chan closed
        }
        fmt.Println(uuid)
        return // job done

    case <-time.After(1 * time.Second):
        fmt.Println("Timeout")
        return
    }
    // }
}

func main() {
    const max = 5
    slice := make([]chan string, max)
    var wg sync.WaitGroup

    for i := 0; i < max; i++ {
        slice[i] = make(chan string, 1)

        wg.Add(1)
        go handleEntity(slice[i], &wg)
    }

    for i := 0; i < 4; i++ {
        slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
    }

    wg.Wait()
    fmt.Println("All done.")
}

输出:

3
0
1
2
Timeout
All done.

  1. 使用labelsync.Cond的信号广播:
    所以我们有一个 box 并使用名为 label 的共享变量,我们将接收者的地址添加到盒子的顶部。 这里使用名为 label 的共享资源,首先将框 label 设置为所需的 ID,然后使用信号广播通知所有监听的 goroutines 唤醒并检查 label 和时间以查看是否有寻址和过期与否然后全部回到等待状态,寻址或过期的继续读取无缓冲通道或退出。然后使用 time.AfterFunc 表示剩余 goroutine 的到期,最后 wg.Wait() 让它们全部加入。请注意,第一个 c.Broadcast() 应该在 c.Wait() 之后调用 - 这意味着 goroutines 应该在第一次调用 c.Broadcast() 之前 运行ning,所以一种方法是简单地使用另一个 sync.WaitGroup 名为 w4w wait for wait 的缩写。
package main

import (
    "fmt"
    "sync"
    "time"
)

func handleEntity(entityUuid string) {
    defer wg.Done()
    t0 := time.Now()
    var expired, addressed bool

    w4w.Done()
    m.Lock()
    for !expired && !addressed {
        c.Wait()
        addressed = label == entityUuid
        expired = time.Since(t0) > d
    }
    m.Unlock()

    fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
    if !expired && addressed {
        uuid := <-uuidChan
        fmt.Println("matched =", entityUuid, uuid)
    }
    fmt.Println("done", entityUuid)
}

func main() {
    for i := 0; i < 5; i++ {
        w4w.Add(1)
        wg.Add(1)
        go handleEntity(fmt.Sprintf("%d", i))
    }
    w4w.Wait()

    time.AfterFunc(d, func() {
        // m.Lock()
        // label = "none"
        // m.Unlock()
        fmt.Println("expired")
        c.Broadcast() // expired
    })

    for i := 0; i < 4; i++ {
        m.Lock()
        label = fmt.Sprintf("%d", i)
        m.Unlock()
        c.Broadcast() // notify all
        uuidChan <- label
    }

    fmt.Println("...")
    wg.Wait()
    fmt.Println("all done")
}

var (
    label    string
    uuidChan = make(chan string)
    m        sync.Mutex
    c        = sync.NewCond(&m)
    w4w, wg  sync.WaitGroup
    d        = 1 * time.Second
)

输出:

id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done