GoLang pubsub 服务器停止监听新的频道订阅

GoLang pubsub server stops listening to new channel subscriptions

我的 go websocket 应用程序中有一个 redis pubsub 连接,因此每当客户端连接并订阅频道时,它都会监听并发送消息。 但是,假设客户端 1 订阅了频道 X,pubsub 开始监听并接收来自它的消息。

现在,客户端 1 也订阅了频道 Y,因此服务器也应该收听来自该频道的消息,但是它停止收听 X,只收听 Y

    for {
        switch v := gPubSubConn.Receive().(type) {
        case redis.Message:
            log.Printf("Received message from %s", v.Channel)
            subscriptions := ps.GetSubscriptions(v.Channel, nil)
            for _, sub := range subscriptions {
                if v.Channel == types.TaskResults {
                    go sendTaskResultMessage(v.Data, sub)
                } else if v.Channel == types.TaskCount {
                    go sendTaskCountMessage(v.Data, sub)
                }
            }
        case redis.Subscription:
            log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            log.Println("Error pub/sub, delivery stopped")
            return
        }

这是一个示例日志输出

go-1  | New Client is connected, total:  1
go-1  | 2022/02/16 17:36:03 signature is invalid
go-1  | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | New Client is connected, total:  2
go-1  | 2022/02/16 17:36:14 signature is invalid
go-1  | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1  | 2022/02/16 17:36:16 Received message from task_count
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results

有什么想法吗?

根据评论编辑:

type PubSub struct {
    Clients       []Client
    Subscriptions []Subscription
}

type Client struct {
    Id         string
    Connection *websocket.Conn
}

type Message struct {
    Action  string          `json:"action"`
    Topic   string          `json:"topic"`
    Message json.RawMessage `json:"message"`
    Token   string          `json:"token"`
}

type Subscription struct {
    Topic  string
    Client *Client
    UserId string
}

func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
    var subscriptionList []Subscription

    for _, subscription := range ps.Subscriptions {
        if client != nil {
            if subscription.Client.Id == client.Id && subscription.Topic == topic {
                subscriptionList = append(subscriptionList, subscription)
            }
        } else {
            if subscription.Topic == topic {
                subscriptionList = append(subscriptionList, subscription)
            }
        }
    }
    return subscriptionList
}

这是我的 websocket 处理程序

func websocketHandler(w http.ResponseWriter, r *http.Request) {
    gRedisConn, err := gRedisConn()
    if err != nil {
        log.Panic(err)
    }
    gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
    upgrader.CheckOrigin = func(r *http.Request) bool {
        return true

    }
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    client := pubsub.Client{
        Id:         autoId(),
        Connection: conn,
    }

    // add this client into the list
    ps.AddClient(client)

    fmt.Println("New Client is connected, total: ", len(ps.Clients))

    for {
        messageType, p, err := conn.ReadMessage()
        if err != nil {
            log.Println("Something went wrong", err)
            ps.RemoveClient(client)
            log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
            return
        }
        go listenToMessages()
        ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
    }
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
    m := Message{}
    err := json.Unmarshal(payload, &m)
    if err != nil {
        fmt.Println("This is not correct message payload")
        return ps
    }
    switch m.Action {
    case PUBLISH:
        ps.Publish(m.Topic, m.Message, nil)
    case SUBSCRIBE:
        ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
    case UNSUBSCRIBE:
        fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
    default:
        break
    }

    return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
    clientSubs := ps.GetSubscriptions(topic, client)
    if len(clientSubs) > 0 {
        return ps
    }
    userId := utils.GetUser(token)
    newSubscription := Subscription{
        Topic:  topic,
        Client: client,
        UserId: userId,
    }
    ps.Subscriptions = append(ps.Subscriptions, newSubscription)
    if err := gPubSubConn.Subscribe(topic); err != nil {
        log.Panic(err)
    }
    return ps
}

眼前的问题是由 websocketHandler 中的这一行引起的:

gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}

此行将当前的 pubsub 连接替换为新连接。新连接没有任何订阅。之前的连接泄露了。

在应用程序启动时创建一次 pubsub 连接。

该应用程序至少存在一次数据争用。 运行 带有 race detector 的应用程序并修复报告的问题。