多个 goroutines 在一个通道上有选择地监听
Multiple goroutines listening selectively on one channel
我看过 this, , and 但 none 在这种情况下确实对我有帮助。
如果通道中的值是针对特定 goroutine 的,我有多个 goroutine 需要执行某些任务。
var uuidChan chan string
func handleEntity(entityUuid string) {
go func() {
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
}
case <-time.After(time.Second * 5):
println("Timeout")
return
}
}
}()
}
func main() {
uuidChan = make(chan (string))
for i := 0; i < 5; i++ {
handleEntity(fmt.Sprintf("%d", i))
}
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
}
https://play.golang.org/p/Pu5MhSP9Qtj
在上述逻辑中,uuid 被其中一个通道接收到,但没有任何反应。为了解决这个问题,如果某些 uuid 的逻辑不在该例程中,我尝试更改逻辑以将 uuid 重新插入回通道。我知道这是一种不好的做法,而且也行不通。
func handleEntity(entityUuid string) {
go func() {
var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
} else {
notMe = append(notMe, uuid)
}
case <-time.After(time.Second * 5):
println("Timeout")
defer func() {
for _, uuid := range notMe {
uuidChan <- uuid
}
}()
return
}
}
}()
}
https://play.golang.org/p/5On-Vd7UzqP
执行此操作的正确方法是什么?
也许您想映射您的频道以立即将消息发送到正确的 goroutine:
package main
import (
"fmt"
"time"
)
func worker(u string, c chan string) {
for {
fmt.Printf("got %s in %s\n", <-c, u)
}
}
func main() {
workers := make(map[string]chan string)
for _, u := range []string{"foo", "bar", "baz"} {
workers[u] = make(chan string)
go worker(u, workers[u])
}
workers["foo"] <- "hello"
workers["bar"] <- "world"
workers["baz"] <- "!"
fmt.Println()
time.Sleep(time.Second)
}
您有一个 盒子,里面有一个标签,所以接收者应该先阅读标签,然后决定如何处理它.如果您将标签放在盒子里 - 您是在强迫接收者打开盒子(参见解决方案编号 1)。我鼓励您提供更好的邮政服务,并至少将标签放在盒子外面(参见解决方案编号 3)- 或者最好立即将盒子送到正确的地址(参见解决方案编号 2):
解决这个问题的方法有很多种,只是你的想象力有限:
1.
由于对于具有 ID 的消费者,您只有一个通道,其中包含一个带有 ID 的数据,并且您只能从该通道读取一次数据(假设通道内数据的顺序很重要) - 您有一个简单的解决方案:使用读取 goroutine 从通道读取数据,然后应用逻辑来决定如何处理这些数据 - 例如将它发送到另一个 goroutine 或 运行 一个任务。
试试 this:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
uuidChan := make(chan string)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTimer(5 * time.Second)
defer t.Stop()
for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("Channel closed.")
return
}
// logic:
wg.Add(1)
// Multiple goroutines listening selectively on one channel
go consume(uuid, &wg)
// switch uuid {case 1: go func1(); case 2: go func2()}
case <-t.C:
fmt.Println("Timeout")
return
}
}
}()
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
close(uuidChan) // free up the goroutine
wg.Wait() // wait until all consumers are done
fmt.Println("All done.")
}
// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
defer wg.Done()
// logic: or decide here based on uuid
fmt.Println("job #:", uuid) // job
}
输出:
job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.
- 每个 goroutine 使用一个通道,尝试 this:
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("closed")
return // free up goroutine on chan closed
}
fmt.Println(uuid)
return // job done
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
return
}
// }
}
func main() {
const max = 5
slice := make([]chan string, max)
var wg sync.WaitGroup
for i := 0; i < max; i++ {
slice[i] = make(chan string, 1)
wg.Add(1)
go handleEntity(slice[i], &wg)
}
for i := 0; i < 4; i++ {
slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
}
wg.Wait()
fmt.Println("All done.")
}
输出:
3
0
1
2
Timeout
All done.
- 使用
label
和sync.Cond
的信号广播:
所以我们有一个 box 并使用名为 label
的共享变量,我们将接收者的地址添加到盒子的顶部。
这里使用名为 label
的共享资源,首先将框 label
设置为所需的 ID,然后使用信号广播通知所有监听的 goroutines 唤醒并检查 label
和时间以查看是否有寻址和过期与否然后全部回到等待状态,寻址或过期的继续读取无缓冲通道或退出。然后使用 time.AfterFunc
表示剩余 goroutine 的到期,最后 wg.Wait()
让它们全部加入。请注意,第一个 c.Broadcast()
应该在 c.Wait()
之后调用 - 这意味着 goroutines 应该在第一次调用 c.Broadcast()
之前 运行ning,所以一种方法是简单地使用另一个 sync.WaitGroup
名为 w4w
wait for wait
的缩写。
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(entityUuid string) {
defer wg.Done()
t0 := time.Now()
var expired, addressed bool
w4w.Done()
m.Lock()
for !expired && !addressed {
c.Wait()
addressed = label == entityUuid
expired = time.Since(t0) > d
}
m.Unlock()
fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
if !expired && addressed {
uuid := <-uuidChan
fmt.Println("matched =", entityUuid, uuid)
}
fmt.Println("done", entityUuid)
}
func main() {
for i := 0; i < 5; i++ {
w4w.Add(1)
wg.Add(1)
go handleEntity(fmt.Sprintf("%d", i))
}
w4w.Wait()
time.AfterFunc(d, func() {
// m.Lock()
// label = "none"
// m.Unlock()
fmt.Println("expired")
c.Broadcast() // expired
})
for i := 0; i < 4; i++ {
m.Lock()
label = fmt.Sprintf("%d", i)
m.Unlock()
c.Broadcast() // notify all
uuidChan <- label
}
fmt.Println("...")
wg.Wait()
fmt.Println("all done")
}
var (
label string
uuidChan = make(chan string)
m sync.Mutex
c = sync.NewCond(&m)
w4w, wg sync.WaitGroup
d = 1 * time.Second
)
输出:
id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done
我看过 this,
var uuidChan chan string
func handleEntity(entityUuid string) {
go func() {
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
}
case <-time.After(time.Second * 5):
println("Timeout")
return
}
}
}()
}
func main() {
uuidChan = make(chan (string))
for i := 0; i < 5; i++ {
handleEntity(fmt.Sprintf("%d", i))
}
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
}
https://play.golang.org/p/Pu5MhSP9Qtj
在上述逻辑中,uuid 被其中一个通道接收到,但没有任何反应。为了解决这个问题,如果某些 uuid 的逻辑不在该例程中,我尝试更改逻辑以将 uuid 重新插入回通道。我知道这是一种不好的做法,而且也行不通。
func handleEntity(entityUuid string) {
go func() {
var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
} else {
notMe = append(notMe, uuid)
}
case <-time.After(time.Second * 5):
println("Timeout")
defer func() {
for _, uuid := range notMe {
uuidChan <- uuid
}
}()
return
}
}
}()
}
https://play.golang.org/p/5On-Vd7UzqP
执行此操作的正确方法是什么?
也许您想映射您的频道以立即将消息发送到正确的 goroutine:
package main
import (
"fmt"
"time"
)
func worker(u string, c chan string) {
for {
fmt.Printf("got %s in %s\n", <-c, u)
}
}
func main() {
workers := make(map[string]chan string)
for _, u := range []string{"foo", "bar", "baz"} {
workers[u] = make(chan string)
go worker(u, workers[u])
}
workers["foo"] <- "hello"
workers["bar"] <- "world"
workers["baz"] <- "!"
fmt.Println()
time.Sleep(time.Second)
}
您有一个 盒子,里面有一个标签,所以接收者应该先阅读标签,然后决定如何处理它.如果您将标签放在盒子里 - 您是在强迫接收者打开盒子(参见解决方案编号 1)。我鼓励您提供更好的邮政服务,并至少将标签放在盒子外面(参见解决方案编号 3)- 或者最好立即将盒子送到正确的地址(参见解决方案编号 2):
解决这个问题的方法有很多种,只是你的想象力有限:
1.
由于对于具有 ID 的消费者,您只有一个通道,其中包含一个带有 ID 的数据,并且您只能从该通道读取一次数据(假设通道内数据的顺序很重要) - 您有一个简单的解决方案:使用读取 goroutine 从通道读取数据,然后应用逻辑来决定如何处理这些数据 - 例如将它发送到另一个 goroutine 或 运行 一个任务。
试试 this:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
uuidChan := make(chan string)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTimer(5 * time.Second)
defer t.Stop()
for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("Channel closed.")
return
}
// logic:
wg.Add(1)
// Multiple goroutines listening selectively on one channel
go consume(uuid, &wg)
// switch uuid {case 1: go func1(); case 2: go func2()}
case <-t.C:
fmt.Println("Timeout")
return
}
}
}()
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
close(uuidChan) // free up the goroutine
wg.Wait() // wait until all consumers are done
fmt.Println("All done.")
}
// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
defer wg.Done()
// logic: or decide here based on uuid
fmt.Println("job #:", uuid) // job
}
输出:
job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.
- 每个 goroutine 使用一个通道,尝试 this:
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("closed")
return // free up goroutine on chan closed
}
fmt.Println(uuid)
return // job done
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
return
}
// }
}
func main() {
const max = 5
slice := make([]chan string, max)
var wg sync.WaitGroup
for i := 0; i < max; i++ {
slice[i] = make(chan string, 1)
wg.Add(1)
go handleEntity(slice[i], &wg)
}
for i := 0; i < 4; i++ {
slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
}
wg.Wait()
fmt.Println("All done.")
}
输出:
3
0
1
2
Timeout
All done.
- 使用
label
和sync.Cond
的信号广播:
所以我们有一个 box 并使用名为label
的共享变量,我们将接收者的地址添加到盒子的顶部。 这里使用名为label
的共享资源,首先将框label
设置为所需的 ID,然后使用信号广播通知所有监听的 goroutines 唤醒并检查label
和时间以查看是否有寻址和过期与否然后全部回到等待状态,寻址或过期的继续读取无缓冲通道或退出。然后使用time.AfterFunc
表示剩余 goroutine 的到期,最后wg.Wait()
让它们全部加入。请注意,第一个c.Broadcast()
应该在c.Wait()
之后调用 - 这意味着 goroutines 应该在第一次调用c.Broadcast()
之前 运行ning,所以一种方法是简单地使用另一个sync.WaitGroup
名为w4w
wait for wait
的缩写。
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(entityUuid string) {
defer wg.Done()
t0 := time.Now()
var expired, addressed bool
w4w.Done()
m.Lock()
for !expired && !addressed {
c.Wait()
addressed = label == entityUuid
expired = time.Since(t0) > d
}
m.Unlock()
fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
if !expired && addressed {
uuid := <-uuidChan
fmt.Println("matched =", entityUuid, uuid)
}
fmt.Println("done", entityUuid)
}
func main() {
for i := 0; i < 5; i++ {
w4w.Add(1)
wg.Add(1)
go handleEntity(fmt.Sprintf("%d", i))
}
w4w.Wait()
time.AfterFunc(d, func() {
// m.Lock()
// label = "none"
// m.Unlock()
fmt.Println("expired")
c.Broadcast() // expired
})
for i := 0; i < 4; i++ {
m.Lock()
label = fmt.Sprintf("%d", i)
m.Unlock()
c.Broadcast() // notify all
uuidChan <- label
}
fmt.Println("...")
wg.Wait()
fmt.Println("all done")
}
var (
label string
uuidChan = make(chan string)
m sync.Mutex
c = sync.NewCond(&m)
w4w, wg sync.WaitGroup
d = 1 * time.Second
)
输出:
id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done