如何使用频道广播消息
How to broadcast message using channel
我是新来的,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。
在我的服务器中,我有一个接受连接的 goroutine(无限循环),所有连接都由一个通道接收。
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过遍历通道向所有连接广播。
for c := range ch {
conn.Write(msg)
}
但是,我无法广播,因为(我认为从阅读文档来看)在迭代之前需要关闭频道。我不确定什么时候应该关闭频道,因为我想不断接受新连接,而关闭频道不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,我们将不胜感激。
你做的是扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获得消息。唯一的例外是 close
频道。 close
将被所有听众识别,因此 "broadcast".
但是你想要做的是广播一条从连接中读取的消息,所以我们可以这样做:
当听众数量已知时
让每个worker收听专用广播频道,并将消息从主频道发送到每个专用广播频道。
type worker struct {
source chan interface{}
quit chan struct{}
}
func (w *worker) Start() {
w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
go func() {
for {
select {
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
}
}
}()
}
然后我们可以有一群工人:
workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }
然后启动我们的监听器:
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
还有一个调度员:
go func() {
for {
msg := <- ch
for _, worker := workers {
worker.source <- msg
}
}
}()
当听众数量未知时
在这种情况下,上面给出的解决方案仍然有效。唯一不同的是,每当你需要一个新的 worker 时,你需要创建一个新的 worker,启动它,然后将它推入 workers
切片。但是这个方法需要一个线程安全的切片,它需要一个锁。其中一种实现可能如下所示:
type threadSafeSlice struct {
sync.Mutex
workers []*worker
}
func (slice *threadSafeSlice) Push(w *worker) {
slice.Lock()
defer slice.Unlock()
workers = append(workers, w)
}
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
slice.Lock()
defer slice.Unlock()
for _, worker := range workers {
routine(worker)
}
}
每当你想启动一个工人时:
w := &worker{}
w.Start()
threadSafeSlice.Push(w)
您的调度员将更改为:
go func() {
for {
msg := <- ch
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
}
}()
最后的话:永远不要留下悬空的 goroutine
其中一个好的做法是:永远不要留下悬挂的 goroutine。所以当你听完之后,你需要关闭你触发的所有 goroutines。这将通过 worker
:
中的 quit
频道完成
首先我们需要创建一个全局quit
信号通道:
globalQuit := make(chan struct{})
每当我们创建一个 worker 时,我们都会将 globalQuit
通道分配给它作为它的退出信号:
worker.quit = globalQuit
然后当我们想关闭所有 worker 时,我们只需这样做:
close(globalQuit)
由于close
会被所有监听的goroutines识别(这是你理解的点),所以所有的goroutines都会被返回。记住也要关闭你的调度程序例程,但我会把它留给你:)
因为 Go 通道遵循通信顺序进程 (CSP) 模式,所以通道是一个点对点的通信实体。每次交换总是有一位作家和一位 reader 参与。
但是,每个通道 end 可以在多个 goroutine 中 共享。这是安全的 - 没有危险的竞争条件。
所以可以有多个作者共享写作端。 And/or可以有多个reader共享阅读端。我在 different answer 中写了更多相关内容,其中包括示例。
如果你真的需要一个广播,你不能直接这样做,但不难实现一个中间 goroutine,将一个值复制到一组输出通道中的每一个。
广播到频道的一部分并使用 sync.Mutex 管理频道添加和删除可能是您的情况下最简单的方法。
以下是您可以在 golang 中对 broadcast
执行的操作:
- 您可以使用 sync.Cond 广播共享状态更改。这种方式一旦设置就没有任何分配,但你不能添加超时功能或使用另一个通道。
- 您可以广播关闭旧频道的共享状态更改并创建新频道和 sync.Mutex。这样每个状态更改都有一个分配,但您可以添加超时功能并使用另一个通道。
- 您可以广播到一段函数回调并使用 sync.Mutex 来管理它们。来电者可以做频道的事情。这样每个调用者有多个分配,并与另一个通道一起工作。
- 您可以广播到频道的一部分并使用 sync.Mutex 来管理它们。这样每个调用者有多个分配,并与另一个通道一起工作。
- 您可以广播到 sync.WaitGroup 的一部分并使用 sync.Mutex 来管理它们。
更优雅的解决方案是 "broker",客户端可以订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以为此使用通道,因此接收和分发消息的代理的主循环可以使用单个 select
语句合并所有这些,同步是从解决方案的性质给出。
另一个技巧是将订阅者存储在映射中,从我们用来向他们分发消息的通道映射。所以使用通道作为映射中的键,然后添加和删除客户端是 "dead" 简单的。这是可能的,因为通道值是 comparable,并且它们的比较非常有效,因为通道值是指向通道描述符的简单指针。
事不宜迟,这里是一个简单的代理实现:
type Broker struct {
stopCh chan struct{}
publishCh chan interface{}
subCh chan chan interface{}
unsubCh chan chan interface{}
}
func NewBroker() *Broker {
return &Broker{
stopCh: make(chan struct{}),
publishCh: make(chan interface{}, 1),
subCh: make(chan chan interface{}, 1),
unsubCh: make(chan chan interface{}, 1),
}
}
func (b *Broker) Start() {
subs := map[chan interface{}]struct{}{}
for {
select {
case <-b.stopCh:
return
case msgCh := <-b.subCh:
subs[msgCh] = struct{}{}
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker:
select {
case msgCh <- msg:
default:
}
}
}
}
}
func (b *Broker) Stop() {
close(b.stopCh)
}
func (b *Broker) Subscribe() chan interface{} {
msgCh := make(chan interface{}, 5)
b.subCh <- msgCh
return msgCh
}
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
}
func (b *Broker) Publish(msg interface{}) {
b.publishCh <- msg
}
使用示例:
func main() {
// Create and start a broker:
b := NewBroker()
go b.Start()
// Create and subscribe 3 clients:
clientFunc := func(id int) {
msgCh := b.Subscribe()
for {
fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
}
}
for i := 0; i < 3; i++ {
go clientFunc(i)
}
// Start publishing messages:
go func() {
for msgId := 0; ; msgId++ {
b.Publish(fmt.Sprintf("msg#%d", msgId))
time.Sleep(300 * time.Millisecond)
}
}()
time.Sleep(time.Second)
}
上面的输出将是(在 Go Playground 上尝试):
Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3
改进
您可以考虑以下改进。这些可能有用也可能没用,具体取决于您使用经纪人的方式/用途。
Broker.Unsubscribe()
可能会关闭消息通道,表示不再发送消息:
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
close(msgCh)
}
这将允许客户端通过消息通道 range
,如下所示:
msgCh := b.Subscribe()
for msg := range msgCh {
fmt.Printf("Client %d got message: %v\n", id, msg)
}
然后如果有人像这样取消订阅 msgCh
:
b.Unsubscribe(msgCh)
上述范围循环将在处理完调用 Unsubscribe()
之前发送的所有消息后终止。
如果您希望您的客户依赖于关闭的消息通道,并且代理的生命周期比您的应用程序的生命周期短,那么您也可以在代理停止时关闭所有订阅的客户端,在 Start()
方法是这样的:
case <-b.stopCh:
for msgCh := range subs {
close(msgCh)
}
return
这是一个迟到的答案,但我认为它可能会安抚一些好奇的读者。
Go 通道在并发方面受到广泛欢迎。
Go社区死板地遵循这句话:
Do not communicate by sharing memory; instead, share memory by communicating.
我对此完全中立,我认为在广播方面应该考虑其他选项而不是 well-defined channels
。
这是我的看法:来自同步包的 Cond 是 widely overlooked。按照 Bronze man 的建议实施 braodcaster 在非常相同的上下文中值得注意。
我很高兴 icza 建议使用频道并通过频道广播消息。我遵循相同的方法并使用同步的条件变量:
// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct {
cond *sync.Cond
subscribers map[interface{}]func(interface{})
message interface{}
running bool
}
这是我们整个广播概念所依赖的主要结构。
下面,我为这个结构定义了一些行为。简而言之,订阅者应该能够被添加、删除,并且整个过程应该是可撤销的。
// SetupBroadcaster gives the broadcaster object to be used further in messaging
func SetupBroadcaster() *Broadcaster {
return &Broadcaster{
cond: sync.NewCond(&sync.RWMutex{}),
subscribers: map[interface{}]func(interface{}){},
}
}
// Subscribe let others enroll in broadcast event!
func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
b.subscribers[id] = f
}
// Unsubscribe stop receiving broadcasting
func (b *Broadcaster) Unsubscribe(id interface{}) {
b.cond.L.Lock()
delete(b.subscribers, id)
b.cond.L.Unlock()
}
// Publish publishes the message
func (b *Broadcaster) Publish(message interface{}) {
go func() {
b.cond.L.Lock()
b.message = message
b.cond.Broadcast()
b.cond.L.Unlock()
}()
}
// Start the main broadcasting event
func (b *Broadcaster) Start() {
b.running = true
for b.running {
b.cond.L.Lock()
b.cond.Wait()
go func() {
for _, f := range b.subscribers {
f(b.message) // publishes the message
}
}()
b.cond.L.Unlock()
}
}
// Stop broadcasting event
func (b *Broadcaster) Stop() {
b.running = false
}
接下来,我可以很轻松地使用它:
messageToaster := func(message interface{}) {
fmt.Printf("[New Message]: %v\n", message)
}
unwillingReceiver := func(message interface{}) {
fmt.Println("Do not disturb!")
}
broadcaster := SetupBroadcaster()
broadcaster.Subscribe(1, messageToaster)
broadcaster.Subscribe(2, messageToaster)
broadcaster.Subscribe(3, unwillingReceiver)
go broadcaster.Start()
broadcaster.Publish("Hello!")
time.Sleep(time.Second)
broadcaster.Unsubscribe(3)
broadcaster.Publish("Goodbye!")
它应该以任何顺序打印如下内容:
[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!
上看到这个
再举一个简单的例子:
https://play.golang.org
type Broadcaster struct {
mu sync.Mutex
clients map[int64]chan struct{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
clients: make(map[int64]chan struct{}),
}
}
func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
defer b.mu.Unlock()
b.mu.Lock()
s := make(chan struct{}, 1)
if _, ok := b.clients[id]; ok {
return nil, fmt.Errorf("signal %d already exist", id)
}
b.clients[id] = s
return b.clients[id], nil
}
func (b *Broadcaster) Unsubscribe(id int64) {
defer b.mu.Unlock()
b.mu.Lock()
if _, ok := b.clients[id]; ok {
close(b.clients[id])
}
delete(b.clients, id)
}
func (b *Broadcaster) broadcast() {
defer b.mu.Unlock()
b.mu.Lock()
for k := range b.clients {
if len(b.clients[k]) == 0 {
b.clients[k] <- struct{}{}
}
}
}
type testClient struct {
name string
signal <-chan struct{}
signalID int64
brd *Broadcaster
}
func (c *testClient) doWork() {
i := 0
for range c.signal {
fmt.Println(c.name, "do work", i)
if i > 2 {
c.brd.Unsubscribe(c.signalID)
fmt.Println(c.name, "unsubscribed")
}
i++
}
fmt.Println(c.name, "done")
}
func main() {
var err error
brd := NewBroadcaster()
clients := make([]*testClient, 0)
for i := 0; i < 3; i++ {
c := &testClient{
name: fmt.Sprint("client:", i),
signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
brd: brd,
}
c.signal, err = brd.Subscribe(c.signalID)
if err != nil {
log.Fatal(err)
}
clients = append(clients, c)
}
for i := 0; i < len(clients); i++ {
go clients[i].doWork()
}
for i := 0; i < 6; i++ {
brd.broadcast()
time.Sleep(time.Second)
}
}
输出:
client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done
我是新来的,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。
在我的服务器中,我有一个接受连接的 goroutine(无限循环),所有连接都由一个通道接收。
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过遍历通道向所有连接广播。
for c := range ch {
conn.Write(msg)
}
但是,我无法广播,因为(我认为从阅读文档来看)在迭代之前需要关闭频道。我不确定什么时候应该关闭频道,因为我想不断接受新连接,而关闭频道不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,我们将不胜感激。
你做的是扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获得消息。唯一的例外是 close
频道。 close
将被所有听众识别,因此 "broadcast".
但是你想要做的是广播一条从连接中读取的消息,所以我们可以这样做:
当听众数量已知时
让每个worker收听专用广播频道,并将消息从主频道发送到每个专用广播频道。
type worker struct {
source chan interface{}
quit chan struct{}
}
func (w *worker) Start() {
w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
go func() {
for {
select {
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
}
}
}()
}
然后我们可以有一群工人:
workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }
然后启动我们的监听器:
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
还有一个调度员:
go func() {
for {
msg := <- ch
for _, worker := workers {
worker.source <- msg
}
}
}()
当听众数量未知时
在这种情况下,上面给出的解决方案仍然有效。唯一不同的是,每当你需要一个新的 worker 时,你需要创建一个新的 worker,启动它,然后将它推入 workers
切片。但是这个方法需要一个线程安全的切片,它需要一个锁。其中一种实现可能如下所示:
type threadSafeSlice struct {
sync.Mutex
workers []*worker
}
func (slice *threadSafeSlice) Push(w *worker) {
slice.Lock()
defer slice.Unlock()
workers = append(workers, w)
}
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
slice.Lock()
defer slice.Unlock()
for _, worker := range workers {
routine(worker)
}
}
每当你想启动一个工人时:
w := &worker{}
w.Start()
threadSafeSlice.Push(w)
您的调度员将更改为:
go func() {
for {
msg := <- ch
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
}
}()
最后的话:永远不要留下悬空的 goroutine
其中一个好的做法是:永远不要留下悬挂的 goroutine。所以当你听完之后,你需要关闭你触发的所有 goroutines。这将通过 worker
:
quit
频道完成
首先我们需要创建一个全局quit
信号通道:
globalQuit := make(chan struct{})
每当我们创建一个 worker 时,我们都会将 globalQuit
通道分配给它作为它的退出信号:
worker.quit = globalQuit
然后当我们想关闭所有 worker 时,我们只需这样做:
close(globalQuit)
由于close
会被所有监听的goroutines识别(这是你理解的点),所以所有的goroutines都会被返回。记住也要关闭你的调度程序例程,但我会把它留给你:)
因为 Go 通道遵循通信顺序进程 (CSP) 模式,所以通道是一个点对点的通信实体。每次交换总是有一位作家和一位 reader 参与。
但是,每个通道 end 可以在多个 goroutine 中 共享。这是安全的 - 没有危险的竞争条件。
所以可以有多个作者共享写作端。 And/or可以有多个reader共享阅读端。我在 different answer 中写了更多相关内容,其中包括示例。
如果你真的需要一个广播,你不能直接这样做,但不难实现一个中间 goroutine,将一个值复制到一组输出通道中的每一个。
广播到频道的一部分并使用 sync.Mutex 管理频道添加和删除可能是您的情况下最简单的方法。
以下是您可以在 golang 中对 broadcast
执行的操作:
- 您可以使用 sync.Cond 广播共享状态更改。这种方式一旦设置就没有任何分配,但你不能添加超时功能或使用另一个通道。
- 您可以广播关闭旧频道的共享状态更改并创建新频道和 sync.Mutex。这样每个状态更改都有一个分配,但您可以添加超时功能并使用另一个通道。
- 您可以广播到一段函数回调并使用 sync.Mutex 来管理它们。来电者可以做频道的事情。这样每个调用者有多个分配,并与另一个通道一起工作。
- 您可以广播到频道的一部分并使用 sync.Mutex 来管理它们。这样每个调用者有多个分配,并与另一个通道一起工作。
- 您可以广播到 sync.WaitGroup 的一部分并使用 sync.Mutex 来管理它们。
更优雅的解决方案是 "broker",客户端可以订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以为此使用通道,因此接收和分发消息的代理的主循环可以使用单个 select
语句合并所有这些,同步是从解决方案的性质给出。
另一个技巧是将订阅者存储在映射中,从我们用来向他们分发消息的通道映射。所以使用通道作为映射中的键,然后添加和删除客户端是 "dead" 简单的。这是可能的,因为通道值是 comparable,并且它们的比较非常有效,因为通道值是指向通道描述符的简单指针。
事不宜迟,这里是一个简单的代理实现:
type Broker struct {
stopCh chan struct{}
publishCh chan interface{}
subCh chan chan interface{}
unsubCh chan chan interface{}
}
func NewBroker() *Broker {
return &Broker{
stopCh: make(chan struct{}),
publishCh: make(chan interface{}, 1),
subCh: make(chan chan interface{}, 1),
unsubCh: make(chan chan interface{}, 1),
}
}
func (b *Broker) Start() {
subs := map[chan interface{}]struct{}{}
for {
select {
case <-b.stopCh:
return
case msgCh := <-b.subCh:
subs[msgCh] = struct{}{}
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker:
select {
case msgCh <- msg:
default:
}
}
}
}
}
func (b *Broker) Stop() {
close(b.stopCh)
}
func (b *Broker) Subscribe() chan interface{} {
msgCh := make(chan interface{}, 5)
b.subCh <- msgCh
return msgCh
}
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
}
func (b *Broker) Publish(msg interface{}) {
b.publishCh <- msg
}
使用示例:
func main() {
// Create and start a broker:
b := NewBroker()
go b.Start()
// Create and subscribe 3 clients:
clientFunc := func(id int) {
msgCh := b.Subscribe()
for {
fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
}
}
for i := 0; i < 3; i++ {
go clientFunc(i)
}
// Start publishing messages:
go func() {
for msgId := 0; ; msgId++ {
b.Publish(fmt.Sprintf("msg#%d", msgId))
time.Sleep(300 * time.Millisecond)
}
}()
time.Sleep(time.Second)
}
上面的输出将是(在 Go Playground 上尝试):
Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3
改进
您可以考虑以下改进。这些可能有用也可能没用,具体取决于您使用经纪人的方式/用途。
Broker.Unsubscribe()
可能会关闭消息通道,表示不再发送消息:
func (b *Broker) Unsubscribe(msgCh chan interface{}) {
b.unsubCh <- msgCh
close(msgCh)
}
这将允许客户端通过消息通道 range
,如下所示:
msgCh := b.Subscribe()
for msg := range msgCh {
fmt.Printf("Client %d got message: %v\n", id, msg)
}
然后如果有人像这样取消订阅 msgCh
:
b.Unsubscribe(msgCh)
上述范围循环将在处理完调用 Unsubscribe()
之前发送的所有消息后终止。
如果您希望您的客户依赖于关闭的消息通道,并且代理的生命周期比您的应用程序的生命周期短,那么您也可以在代理停止时关闭所有订阅的客户端,在 Start()
方法是这样的:
case <-b.stopCh:
for msgCh := range subs {
close(msgCh)
}
return
这是一个迟到的答案,但我认为它可能会安抚一些好奇的读者。
Go 通道在并发方面受到广泛欢迎。
Go社区死板地遵循这句话:
Do not communicate by sharing memory; instead, share memory by communicating.
我对此完全中立,我认为在广播方面应该考虑其他选项而不是 well-defined channels
。
这是我的看法:来自同步包的 Cond 是 widely overlooked。按照 Bronze man 的建议实施 braodcaster 在非常相同的上下文中值得注意。
我很高兴 icza 建议使用频道并通过频道广播消息。我遵循相同的方法并使用同步的条件变量:
// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct {
cond *sync.Cond
subscribers map[interface{}]func(interface{})
message interface{}
running bool
}
这是我们整个广播概念所依赖的主要结构。
下面,我为这个结构定义了一些行为。简而言之,订阅者应该能够被添加、删除,并且整个过程应该是可撤销的。
// SetupBroadcaster gives the broadcaster object to be used further in messaging
func SetupBroadcaster() *Broadcaster {
return &Broadcaster{
cond: sync.NewCond(&sync.RWMutex{}),
subscribers: map[interface{}]func(interface{}){},
}
}
// Subscribe let others enroll in broadcast event!
func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
b.subscribers[id] = f
}
// Unsubscribe stop receiving broadcasting
func (b *Broadcaster) Unsubscribe(id interface{}) {
b.cond.L.Lock()
delete(b.subscribers, id)
b.cond.L.Unlock()
}
// Publish publishes the message
func (b *Broadcaster) Publish(message interface{}) {
go func() {
b.cond.L.Lock()
b.message = message
b.cond.Broadcast()
b.cond.L.Unlock()
}()
}
// Start the main broadcasting event
func (b *Broadcaster) Start() {
b.running = true
for b.running {
b.cond.L.Lock()
b.cond.Wait()
go func() {
for _, f := range b.subscribers {
f(b.message) // publishes the message
}
}()
b.cond.L.Unlock()
}
}
// Stop broadcasting event
func (b *Broadcaster) Stop() {
b.running = false
}
接下来,我可以很轻松地使用它:
messageToaster := func(message interface{}) {
fmt.Printf("[New Message]: %v\n", message)
}
unwillingReceiver := func(message interface{}) {
fmt.Println("Do not disturb!")
}
broadcaster := SetupBroadcaster()
broadcaster.Subscribe(1, messageToaster)
broadcaster.Subscribe(2, messageToaster)
broadcaster.Subscribe(3, unwillingReceiver)
go broadcaster.Start()
broadcaster.Publish("Hello!")
time.Sleep(time.Second)
broadcaster.Unsubscribe(3)
broadcaster.Publish("Goodbye!")
它应该以任何顺序打印如下内容:
[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!
上看到这个
再举一个简单的例子: https://play.golang.org
type Broadcaster struct {
mu sync.Mutex
clients map[int64]chan struct{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
clients: make(map[int64]chan struct{}),
}
}
func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
defer b.mu.Unlock()
b.mu.Lock()
s := make(chan struct{}, 1)
if _, ok := b.clients[id]; ok {
return nil, fmt.Errorf("signal %d already exist", id)
}
b.clients[id] = s
return b.clients[id], nil
}
func (b *Broadcaster) Unsubscribe(id int64) {
defer b.mu.Unlock()
b.mu.Lock()
if _, ok := b.clients[id]; ok {
close(b.clients[id])
}
delete(b.clients, id)
}
func (b *Broadcaster) broadcast() {
defer b.mu.Unlock()
b.mu.Lock()
for k := range b.clients {
if len(b.clients[k]) == 0 {
b.clients[k] <- struct{}{}
}
}
}
type testClient struct {
name string
signal <-chan struct{}
signalID int64
brd *Broadcaster
}
func (c *testClient) doWork() {
i := 0
for range c.signal {
fmt.Println(c.name, "do work", i)
if i > 2 {
c.brd.Unsubscribe(c.signalID)
fmt.Println(c.name, "unsubscribed")
}
i++
}
fmt.Println(c.name, "done")
}
func main() {
var err error
brd := NewBroadcaster()
clients := make([]*testClient, 0)
for i := 0; i < 3; i++ {
c := &testClient{
name: fmt.Sprint("client:", i),
signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
brd: brd,
}
c.signal, err = brd.Subscribe(c.signalID)
if err != nil {
log.Fatal(err)
}
clients = append(clients, c)
}
for i := 0; i < len(clients); i++ {
go clients[i].doWork()
}
for i := 0; i < 6; i++ {
brd.broadcast()
time.Sleep(time.Second)
}
}
输出:
client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done