GoLang pubsub 服务器停止监听新的频道订阅
GoLang pubsub server stops listening to new channel subscriptions
我的 go websocket 应用程序中有一个 redis pubsub 连接,因此每当客户端连接并订阅频道时,它都会监听并发送消息。
但是,假设客户端 1 订阅了频道 X
,pubsub 开始监听并接收来自它的消息。
现在,客户端 1 也订阅了频道 Y
,因此服务器也应该收听来自该频道的消息,但是它停止收听 X
,只收听 Y
。
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
log.Printf("Received message from %s", v.Channel)
subscriptions := ps.GetSubscriptions(v.Channel, nil)
for _, sub := range subscriptions {
if v.Channel == types.TaskResults {
go sendTaskResultMessage(v.Data, sub)
} else if v.Channel == types.TaskCount {
go sendTaskCountMessage(v.Data, sub)
}
}
case redis.Subscription:
log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("Error pub/sub, delivery stopped")
return
}
这是一个示例日志输出
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
有什么想法吗?
根据评论编辑:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}
这是我的 websocket 处理程序
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// add this client into the list
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}
眼前的问题是由 websocketHandler
中的这一行引起的:
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
此行将当前的 pubsub 连接替换为新连接。新连接没有任何订阅。之前的连接泄露了。
在应用程序启动时创建一次 pubsub 连接。
该应用程序至少存在一次数据争用。 运行 带有 race detector 的应用程序并修复报告的问题。
我的 go websocket 应用程序中有一个 redis pubsub 连接,因此每当客户端连接并订阅频道时,它都会监听并发送消息。
但是,假设客户端 1 订阅了频道 X
,pubsub 开始监听并接收来自它的消息。
现在,客户端 1 也订阅了频道 Y
,因此服务器也应该收听来自该频道的消息,但是它停止收听 X
,只收听 Y
。
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
log.Printf("Received message from %s", v.Channel)
subscriptions := ps.GetSubscriptions(v.Channel, nil)
for _, sub := range subscriptions {
if v.Channel == types.TaskResults {
go sendTaskResultMessage(v.Data, sub)
} else if v.Channel == types.TaskCount {
go sendTaskCountMessage(v.Data, sub)
}
}
case redis.Subscription:
log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("Error pub/sub, delivery stopped")
return
}
这是一个示例日志输出
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
有什么想法吗?
根据评论编辑:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}
这是我的 websocket 处理程序
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// add this client into the list
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}
眼前的问题是由 websocketHandler
中的这一行引起的:
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
此行将当前的 pubsub 连接替换为新连接。新连接没有任何订阅。之前的连接泄露了。
在应用程序启动时创建一次 pubsub 连接。
该应用程序至少存在一次数据争用。 运行 带有 race detector 的应用程序并修复报告的问题。