如何将消息发送到多个频道
How to send message to multiple channels in go
所以我的问题是如何将消息发送到 broadcast
函数仅在通道未关闭且仅一次的情况下获取的通道。
发送消息后应增加sentNumber
。
提醒一下,所有频道发消息是有时间限制的!
package main
import (
"fmt"
"sync"
"time"
)
var (
sentNumber int
)
func broadcast(waitTime time.Duration, message string, ch ...chan string) (sentNumber int) {
start := time.Now()
for _, channel := range ch {
if time.Since(start) >= waitTime {
break
}
go send(channel, message)
}
return 0
}
func send(channel chan string, message string) {
for {
if _,open := <-channel; open{
break
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
channel <- message
}()
wg.Wait()
}
func main() {
a := make(chan string, 1)
b := make(chan string, 1)
broadcast(5, "secret message", a, b)
fmt.Println(<-a)
fmt.Println(<-b)
}
time.Since(start) >= waitTime
无法破解send
函数
go send(channel, message)
在这种情况下不应该比单线程队列更有效
broadcast
没有责任检查频道是否已关闭,频道不是由 broadcast
created/closed
package main
import (
"context"
"fmt"
"time"
)
func broadcast(waitTime time.Duration, message string, chs ...chan string) (sentNumber int) {
ctx, cancel := context.WithTimeout(context.Background(), waitTime)
defer cancel()
jobQueue := make(chan chan string, len(chs))
for _, c := range chs {
jobQueue <- c
}
queue:
for c := range jobQueue {
select {
case c <- message:
// sent success
sentNumber += 1
if sentNumber == len(chs) {
cancel()
}
case <-ctx.Done():
// timeout, break job queue
break queue
default:
// if send failed, retry later
jobQueue <- c
}
}
return
}
func main() {
a := make(chan string)
b := make(chan string)
go func() {
time.Sleep(time.Second)
fmt.Println("a:", <-a)
}()
go func() {
time.Sleep(3 * time.Second)
fmt.Println("b:", <-b)
}()
c := broadcast(2*time.Second, "secret message", a, b)
fmt.Printf("sent count:%d\n", c)
time.Sleep(3 * time.Second)
}
所以我的问题是如何将消息发送到 broadcast
函数仅在通道未关闭且仅一次的情况下获取的通道。
发送消息后应增加sentNumber
。
提醒一下,所有频道发消息是有时间限制的!
package main
import (
"fmt"
"sync"
"time"
)
var (
sentNumber int
)
func broadcast(waitTime time.Duration, message string, ch ...chan string) (sentNumber int) {
start := time.Now()
for _, channel := range ch {
if time.Since(start) >= waitTime {
break
}
go send(channel, message)
}
return 0
}
func send(channel chan string, message string) {
for {
if _,open := <-channel; open{
break
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
channel <- message
}()
wg.Wait()
}
func main() {
a := make(chan string, 1)
b := make(chan string, 1)
broadcast(5, "secret message", a, b)
fmt.Println(<-a)
fmt.Println(<-b)
}
time.Since(start) >= waitTime
无法破解send
函数go send(channel, message)
在这种情况下不应该比单线程队列更有效broadcast
没有责任检查频道是否已关闭,频道不是由broadcast
created/closed
package main
import (
"context"
"fmt"
"time"
)
func broadcast(waitTime time.Duration, message string, chs ...chan string) (sentNumber int) {
ctx, cancel := context.WithTimeout(context.Background(), waitTime)
defer cancel()
jobQueue := make(chan chan string, len(chs))
for _, c := range chs {
jobQueue <- c
}
queue:
for c := range jobQueue {
select {
case c <- message:
// sent success
sentNumber += 1
if sentNumber == len(chs) {
cancel()
}
case <-ctx.Done():
// timeout, break job queue
break queue
default:
// if send failed, retry later
jobQueue <- c
}
}
return
}
func main() {
a := make(chan string)
b := make(chan string)
go func() {
time.Sleep(time.Second)
fmt.Println("a:", <-a)
}()
go func() {
time.Sleep(3 * time.Second)
fmt.Println("b:", <-b)
}()
c := broadcast(2*time.Second, "secret message", a, b)
fmt.Printf("sent count:%d\n", c)
time.Sleep(3 * time.Second)
}