从 Redis 退订似乎不起作用
Unsubscribe from Redis doesn't seem to work
我正在尝试在 Redis 中使用 pub-sub。我所做的是打开两个 redis-cli
。
第一个我发出命令 flushall
以确保启动绿色。
然后在另一个终端中,我打开 MONITOR 以打印来自我的 Golang 示例客户端(代码如下)的所有命令。
这是我从 MONITOR 打印的内容:
1590207069.340860 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.341380 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "New"
1590207069.345266 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.353706 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "Old"
1590207069.354219 [0 127.0.0.1:58912] "subscribe" "New"
1590207069.354741 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.355444 [0 127.0.0.1:58912] "unsubscribe" "New" "Old"
1590207069.356754 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "OldPlusPlus"
1590207069.357206 [0 127.0.0.1:58914] "subscribe" "New" "Old"
1590207069.357656 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.358362 [0 127.0.0.1:58912] "unsubscribe" "OldPlusPlus" "New" "Old"
1590207069.361030 [0 127.0.0.1:58916] "subscribe" "OldPlusPlus" "New" "Old"
我正在尝试让客户端对随着时间的推移打开的所有通道建立一个连接。而不是 connection/thread 来处理到 Redis 的每个通道。因此,每当需要新的订阅请求时,我都会尝试从客户端删除所有以前的订阅,并为旧频道和新频道进行新订阅。
但是 unsubscribe
命令似乎没有按预期工作(或者我遗漏了什么)!
因为当我尝试获取每个频道的客户端数量时,从第一个终端开始:
127.0.0.1:6379> pubsub numsub OldPlusPlus New Old
1) "OldPlusPlus"
2) (integer) 1
3) "New"
4) (integer) 2
5) "Old"
6) (integer) 2
此外,当我尝试向 "New" 频道发送消息时,我的 go 客户端收到了两次消息!
下面是生成上述命令的代码:
package main
import (
"fmt"
"github.com/go-redis/redis/v7"
"log"
)
type user struct {
name string
rooms []string
endSub chan bool
sub bool
}
func (u *user) connect(rdb *redis.Client) error {
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()
if err != nil {
return err
}
u.rooms = r
if len(u.rooms) == 0 {
return nil
}
u.doSubscribe(rdb)
return nil
}
func (u *user) subscribe(room string, rdb *redis.Client) error {
// check if already subscribed
for i := range u.rooms {
if u.rooms[i] == room {
return nil
}
}
// add room to user
userRooms := fmt.Sprintf("user:%s:rooms", u.name)
if err := rdb.SAdd(userRooms, room).Err(); err != nil {
return err
}
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(userRooms).Result()
if err != nil {
return err
}
u.rooms = r
if u.sub {
u.endSub <- true
}
u.doSubscribe(rdb)
return nil
}
func (u *user) doSubscribe(rdb *redis.Client) {
sub := rdb.Subscribe(u.rooms...)
go func() {
u.sub = true
fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)
for {
select {
case msg, ok := <-sub.Channel():
if !ok {
break
}
fmt.Println(msg.Payload)
case <-u.endSub:
fmt.Println("Stop listening for user:", u.name, "from rooms:", u.rooms)
if err := sub.Unsubscribe(u.rooms...); err != nil {
fmt.Println("error unsubscribing")
return
}
break
}
}
}()
}
func (u *user) unsubscribe(room string, rdb *redis.Client) error {
return nil
}
var rdb *redis.Client
func main() {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
u := &user{
name: "Ali",
endSub: make(chan bool),
}
if err := u.connect(rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("New", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("Old", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("OldPlusPlus", rdb); err != nil {
log.Fatal(err)
}
select {}
}
问题与订阅频道的 *redis.PubSub 类型的对象不是用于取消订阅频道的对象有关。
所以我必须维护对此类对象的引用,然后使用该引用取消订阅所有频道。
修改后的代码如下:
package main
import (
"fmt"
"github.com/go-redis/redis/v7"
"log"
)
type user struct {
name string
rooms []string
stopRunning chan bool
running bool
roomsPubsub map[string]*redis.PubSub
}
func (u *user) connect(rdb *redis.Client) error {
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()
if err != nil {
return err
}
u.rooms = r
if len(u.rooms) == 0 {
return nil
}
u.doSubscribe("", rdb)
return nil
}
func (u *user) subscribe(room string, rdb *redis.Client) error {
// check if already subscribed
for i := range u.rooms {
if u.rooms[i] == room {
return nil
}
}
// add room to user
userRooms := fmt.Sprintf("user:%s:rooms", u.name)
if err := rdb.SAdd(userRooms, room).Err(); err != nil {
return err
}
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(userRooms).Result()
if err != nil {
return err
}
u.rooms = r
if u.running {
u.stopRunning <- true
}
u.doSubscribe(room, rdb)
return nil
}
func (u *user) doSubscribe(room string, rdb *redis.Client) {
pubSub := rdb.Subscribe(u.rooms...)
if len(room) > 0 {
u.roomsPubsub[room] = pubSub
}
go func() {
u.running = true
fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)
for {
select {
case msg, ok := <-pubSub.Channel():
if !ok {
break
}
fmt.Println(msg.Payload, msg.Channel)
case <-u.stopRunning:
fmt.Println("Stop listening for user:", u.name, "on old rooms")
for k, v := range u.roomsPubsub {
if err := v.Unsubscribe(); err != nil {
fmt.Println("unable to unsubscribe", err)
}
delete(u.roomsPubsub, k)
}
break
}
}
}()
}
func (u *user) unsubscribe(room string, rdb *redis.Client) error {
return nil
}
var rdb *redis.Client
func main() {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
u := &user{
name: "Wael",
stopRunning: make(chan bool),
roomsPubsub: make(map[string]*redis.PubSub),
}
if err := u.connect(rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("New", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("Old", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("OldPlusPlus", rdb); err != nil {
log.Fatal(err)
}
select {}
}
我正在尝试在 Redis 中使用 pub-sub。我所做的是打开两个 redis-cli
。
第一个我发出命令 flushall
以确保启动绿色。
然后在另一个终端中,我打开 MONITOR 以打印来自我的 Golang 示例客户端(代码如下)的所有命令。
这是我从 MONITOR 打印的内容:
1590207069.340860 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.341380 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "New"
1590207069.345266 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.353706 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "Old"
1590207069.354219 [0 127.0.0.1:58912] "subscribe" "New"
1590207069.354741 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.355444 [0 127.0.0.1:58912] "unsubscribe" "New" "Old"
1590207069.356754 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "OldPlusPlus"
1590207069.357206 [0 127.0.0.1:58914] "subscribe" "New" "Old"
1590207069.357656 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"
1590207069.358362 [0 127.0.0.1:58912] "unsubscribe" "OldPlusPlus" "New" "Old"
1590207069.361030 [0 127.0.0.1:58916] "subscribe" "OldPlusPlus" "New" "Old"
我正在尝试让客户端对随着时间的推移打开的所有通道建立一个连接。而不是 connection/thread 来处理到 Redis 的每个通道。因此,每当需要新的订阅请求时,我都会尝试从客户端删除所有以前的订阅,并为旧频道和新频道进行新订阅。
但是 unsubscribe
命令似乎没有按预期工作(或者我遗漏了什么)!
因为当我尝试获取每个频道的客户端数量时,从第一个终端开始:
127.0.0.1:6379> pubsub numsub OldPlusPlus New Old
1) "OldPlusPlus"
2) (integer) 1
3) "New"
4) (integer) 2
5) "Old"
6) (integer) 2
此外,当我尝试向 "New" 频道发送消息时,我的 go 客户端收到了两次消息!
下面是生成上述命令的代码:
package main
import (
"fmt"
"github.com/go-redis/redis/v7"
"log"
)
type user struct {
name string
rooms []string
endSub chan bool
sub bool
}
func (u *user) connect(rdb *redis.Client) error {
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()
if err != nil {
return err
}
u.rooms = r
if len(u.rooms) == 0 {
return nil
}
u.doSubscribe(rdb)
return nil
}
func (u *user) subscribe(room string, rdb *redis.Client) error {
// check if already subscribed
for i := range u.rooms {
if u.rooms[i] == room {
return nil
}
}
// add room to user
userRooms := fmt.Sprintf("user:%s:rooms", u.name)
if err := rdb.SAdd(userRooms, room).Err(); err != nil {
return err
}
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(userRooms).Result()
if err != nil {
return err
}
u.rooms = r
if u.sub {
u.endSub <- true
}
u.doSubscribe(rdb)
return nil
}
func (u *user) doSubscribe(rdb *redis.Client) {
sub := rdb.Subscribe(u.rooms...)
go func() {
u.sub = true
fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)
for {
select {
case msg, ok := <-sub.Channel():
if !ok {
break
}
fmt.Println(msg.Payload)
case <-u.endSub:
fmt.Println("Stop listening for user:", u.name, "from rooms:", u.rooms)
if err := sub.Unsubscribe(u.rooms...); err != nil {
fmt.Println("error unsubscribing")
return
}
break
}
}
}()
}
func (u *user) unsubscribe(room string, rdb *redis.Client) error {
return nil
}
var rdb *redis.Client
func main() {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
u := &user{
name: "Ali",
endSub: make(chan bool),
}
if err := u.connect(rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("New", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("Old", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("OldPlusPlus", rdb); err != nil {
log.Fatal(err)
}
select {}
}
问题与订阅频道的 *redis.PubSub 类型的对象不是用于取消订阅频道的对象有关。
所以我必须维护对此类对象的引用,然后使用该引用取消订阅所有频道。
修改后的代码如下:
package main
import (
"fmt"
"github.com/go-redis/redis/v7"
"log"
)
type user struct {
name string
rooms []string
stopRunning chan bool
running bool
roomsPubsub map[string]*redis.PubSub
}
func (u *user) connect(rdb *redis.Client) error {
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()
if err != nil {
return err
}
u.rooms = r
if len(u.rooms) == 0 {
return nil
}
u.doSubscribe("", rdb)
return nil
}
func (u *user) subscribe(room string, rdb *redis.Client) error {
// check if already subscribed
for i := range u.rooms {
if u.rooms[i] == room {
return nil
}
}
// add room to user
userRooms := fmt.Sprintf("user:%s:rooms", u.name)
if err := rdb.SAdd(userRooms, room).Err(); err != nil {
return err
}
// get all user rooms (from DB) and start subscribe
r, err := rdb.SMembers(userRooms).Result()
if err != nil {
return err
}
u.rooms = r
if u.running {
u.stopRunning <- true
}
u.doSubscribe(room, rdb)
return nil
}
func (u *user) doSubscribe(room string, rdb *redis.Client) {
pubSub := rdb.Subscribe(u.rooms...)
if len(room) > 0 {
u.roomsPubsub[room] = pubSub
}
go func() {
u.running = true
fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)
for {
select {
case msg, ok := <-pubSub.Channel():
if !ok {
break
}
fmt.Println(msg.Payload, msg.Channel)
case <-u.stopRunning:
fmt.Println("Stop listening for user:", u.name, "on old rooms")
for k, v := range u.roomsPubsub {
if err := v.Unsubscribe(); err != nil {
fmt.Println("unable to unsubscribe", err)
}
delete(u.roomsPubsub, k)
}
break
}
}
}()
}
func (u *user) unsubscribe(room string, rdb *redis.Client) error {
return nil
}
var rdb *redis.Client
func main() {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
u := &user{
name: "Wael",
stopRunning: make(chan bool),
roomsPubsub: make(map[string]*redis.PubSub),
}
if err := u.connect(rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("New", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("Old", rdb); err != nil {
log.Fatal(err)
}
if err := u.subscribe("OldPlusPlus", rdb); err != nil {
log.Fatal(err)
}
select {}
}