这是基于消息总线的发布-订阅模式吗?
Is this a message bus based pub-sub pattern?
根据此 answer、
A Message Bus is a messaging infrastructure to allow different systems to communicate through a shared set of interfaces(message bus).
下面是由 main()
启动的 createHub()
函数和 Run()
方法,用于创建消息中心以与多个订阅者通信:
type PubHub struct {
subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
Register chan *subscriptionmediator.HandlerSubscription
Unregister chan *subscriptionmediator.HandlerSubscription
Broadcast chan *events.Env
}
func createHub() *PubHub {
return &PubHub{
subscribers: map[*subscriptionmediator.HandlerSubscription]struct{}{},
Register: make(chan *subscriptionmediator.HandlerSubscription),
Unregister: make(chan *subscriptionmediator.HandlerSubscription),
Broadcast: make(chan *events.Envelope),
}
}
func (h *PubHub) Run() {
for {
select {
case subscriber := <-h.Register:
h.subscribers[subscriber] = struct{}{}
case subscriber := <-h.Unregister:
if _, ok := h.subscribers[subscriber]; ok {
delete(h.subscribers, subscriber)
}
case message := <-h.Broadcast:
for subscriber := range h.subscribers {
subscriber.DataChannel <- message
}
}
}
}
每个订阅者注册的地方,如下图:
subscription := &subscriptionmediator.HandlerSubscription{
conn,
make(chan *events.Envelope),
}
hub.Register <- subscription
DataChannel
用于发布者&多个订阅者之间的通信
type HandlerSubscription struct {
ConnInstance *websocket.Conn
DataChannel chan *events.Envelope
}
1) 上面的代码是否可以视为遵循基于消息总线的发布-订阅模式?
2) 如何避免一个订阅者阻止所有订阅者在频道上发送信号? subscriber.DataChannel <- message
Can the above code be considered following message bus based pub-sub pattern?
是的。
根据此 answer、
A Message Bus is a messaging infrastructure to allow different systems to communicate through a shared set of interfaces(message bus).
下面是由 main()
启动的 createHub()
函数和 Run()
方法,用于创建消息中心以与多个订阅者通信:
type PubHub struct {
subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
Register chan *subscriptionmediator.HandlerSubscription
Unregister chan *subscriptionmediator.HandlerSubscription
Broadcast chan *events.Env
}
func createHub() *PubHub {
return &PubHub{
subscribers: map[*subscriptionmediator.HandlerSubscription]struct{}{},
Register: make(chan *subscriptionmediator.HandlerSubscription),
Unregister: make(chan *subscriptionmediator.HandlerSubscription),
Broadcast: make(chan *events.Envelope),
}
}
func (h *PubHub) Run() {
for {
select {
case subscriber := <-h.Register:
h.subscribers[subscriber] = struct{}{}
case subscriber := <-h.Unregister:
if _, ok := h.subscribers[subscriber]; ok {
delete(h.subscribers, subscriber)
}
case message := <-h.Broadcast:
for subscriber := range h.subscribers {
subscriber.DataChannel <- message
}
}
}
}
每个订阅者注册的地方,如下图:
subscription := &subscriptionmediator.HandlerSubscription{
conn,
make(chan *events.Envelope),
}
hub.Register <- subscription
DataChannel
用于发布者&多个订阅者之间的通信
type HandlerSubscription struct {
ConnInstance *websocket.Conn
DataChannel chan *events.Envelope
}
1) 上面的代码是否可以视为遵循基于消息总线的发布-订阅模式?
2) 如何避免一个订阅者阻止所有订阅者在频道上发送信号? subscriber.DataChannel <- message
Can the above code be considered following message bus based pub-sub pattern?
是的。