Golang 通道卡住

Golang channels getting stuck

我正在使用 go 和 redis 向订阅者发送消息队列(通道)。我的目标是创建一个自动缩放解决方案,随着队列变大,它会产生新的 go 例程(在一定限度内)。我有以下代码:

// Set up max queued messages
var maxMessages = float64(100000)

// Set up max redis senders
var maxSender = float64(5)

// Set up message channel
var messages = make(chan Message, int(maxMessages))

// Set up messages per sender count
var senderRatio = maxSender / maxMessages

type Message struct {
    ChatId  int    `json:"chatId"`
    UserId  int    `json:"userId"`
    Message string `json:"message"`
    Date    int    `json:"date"`
}

func RedisWriteHandler(messageChannel chan Message) {
    senderCount := 0
    killswitch := make(chan string)
    for {
        length := float64(len(messageChannel))
        neededSenders := int(math.Ceil(length * senderRatio))
        if senderCount < neededSenders || senderCount < 1 {
            log.Printf("Increasing sender count to %d, need %d", senderCount+1, neededSenders)
            go addRedisSender(messageChannel, killswitch)
            senderCount++
        } else if senderCount > neededSenders && senderCount > 1 {
            log.Printf("Decreasing sender count to %d, need %d", senderCount-1, neededSenders)
            killMessage := fmt.Sprintf("only need %d senders", neededSenders)
            killswitch <- killMessage
            senderCount--
        }
    }
    log.Fatal("The redis handler unexpectedly went away")
}

func addRedisSender(messageChannel chan Message, killswitch chan string) {
    c, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        log.Println(err)
        return
    }
    defer c.Close()
    for {
        select {
        case msg := <-messageChannel:
            redisChannel := strconv.Itoa(msg.ChatId)
            messageBlob, err := json.Marshal(msg)
            if err != nil {
                log.Println(err)
            }
            _, err = c.Do("PUBLISH", redisChannel, messageBlob)
            if err != nil {
                log.Println(err)
            }
        case kill := <-killswitch:
            log.Printf("Sender killed: %s", kill)
            return
        }
    }
    log.Println("Closing redis sender")
}

如果我 运行 此代码使用缓冲区大小较大的通道(比如 100,000 条消息),它会一次添加 5 个发件人并开始在队列中工作 - 到目前为止一切顺利。然而,在一个看似随机的点 - around 1500 条消息挂起。根本没有更多日志(我想我已经涵盖了所有出口点)。我的预期输出是让发件人增加到 maxSender 值并在整个执行过程中定期减少。下面是我收到的包含 100k 条消息的日志示例

2018/05/02 08:21:25 Increasing sender count to 1, need 5
2018/05/02 08:21:25 Increasing sender count to 2, need 5
2018/05/02 08:21:25 Increasing sender count to 3, need 5
2018/05/02 08:21:25 Increasing sender count to 4, need 5
2018/05/02 08:21:25 Increasing sender count to 5, need 5

然后什么都没有。

我可以从其他测试中看到我 运行ning 这不仅仅是进展缓慢而且消息只是没有从频道中获取。任何人都可以阐明这一点吗?

谢谢,

山姆

编辑

我一直潜伏在其他一些关于无缘无故挂起的问题上,建议的操作是终止父进程以获取每个子进程的堆栈跟踪。下面是输出。

kill -ABRT 9162
SIGABRT: abort
PC=0x65f199 m=0 sigcode=0

goroutine 6 [running]:
main.RedisWriteHandler(0xc42005a180)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:116 +0x99 fp=0xc42004dfd8 sp=0xc42004dee8 pc=0x65f199
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42004dfe0 sp=0xc42004dfd8 pc=0x458f01
created by main.main
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:236 +0x130

goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5f70, 0x72, 0xffffffffffffffff)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722098, 0x72, 0xc420038b00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722098, 0xffffffffffffff00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Accept(0xc422722080, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:334 +0x1e2
net.(*netFD).accept(0xc422722080, 0x7f0311511000, 0x0, 0x7139b0)
    /usr/local/go/src/net/fd_unix.go:238 +0x42
net.(*TCPListener).accept(0xc4200b2000, 0xc420038d80, 0x4122c8, 0x30)
    /usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
net.(*TCPListener).AcceptTCP(0xc4200b2000, 0xc422736120, 0xc422736120, 0x6a3b00)
    /usr/local/go/src/net/tcpsock.go:234 +0x49
net/http.tcpKeepAliveListener.Accept(0xc4200b2000, 0xc42001c0d8, 0x6a3b00, 0x8813d0, 0x6f2aa0)
    /usr/local/go/src/net/http/server.go:3120 +0x2f
net/http.(*Server).Serve(0xc42007a4e0, 0x8575c0, 0xc4200b2000, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2695 +0x1b2
net/http.(*Server).ListenAndServe(0xc42007a4e0, 0xc42007a4e0, 0xc420038f00)
    /usr/local/go/src/net/http/server.go:2636 +0xa9
net/http.ListenAndServe(0x6ff124, 0x5, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2882 +0x7f
main.main()
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:239 +0x15b

goroutine 21 [runnable]:
internal/poll.runtime_pollWait(0x7f03114c5eb0, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722198, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722198, 0xc4227ab000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc4227a6000, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227ae000)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227ae000, 0x7f03114c5e0a, 0x0, 0x459fd6, 0xc42276db60, 0x491d6d, 0xc422722180)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc4227b0000, 0x0, 0x8000000000000000, 0xc422722180, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc4227b0000, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc4227b0000, 0x0, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800d60, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc4227b0000, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x0, 0xc4227ba9a0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 22 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5c70, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422758098, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422758098, 0xc422774000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c020, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc422748120)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc422748120, 0x7f03114c5c0a, 0x0, 0x459fd6, 0xc420049b60, 0x491d6d, 0xc422758080)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084820, 0x0, 0x8000000000000000, 0xc422758080, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084820, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084820, 0x0, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800de0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084820, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x0, 0xc4227ba9b0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 23 [runnable]:
syscall.Syscall(0x0, 0x7, 0xc42277a000, 0x1000, 0x4, 0x1000, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:18 +0x5
syscall.read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xc42276f900, 0x0, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:756 +0x55
syscall.Read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xcb, 0xc4228072b0, 0xcb)
    /usr/local/go/src/syscall/syscall_unix.go:162 +0x49
internal/poll.(*FD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:121 +0x125
net.(*netFD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c060, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227481e0)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227481e0, 0x7f03114c5d0a, 0x0, 0x459fd6, 0xc42276fb60, 0x491d6d, 0xc422722280)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084a00, 0x0, 0x8000000000000000, 0xc422722280, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084a00, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084a00, 0x0, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x68f280, 0x696f01, 0xc4227f1920, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084a00, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x0, 0xc4227318e0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 24 [running]:
    goroutine running on other thread; stack unavailable
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 25 [runnable]:
syscall.Syscall6(0x37, 0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0x0, 0x0, 0x4, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:44 +0x5
syscall.getsockopt(0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0xc422722400, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:1605 +0x7c
syscall.GetsockoptInt(0x9, 0x1, 0x4, 0x1, 0x0, 0x0)
    /usr/local/go/src/syscall/syscall_unix.go:245 +0x63
net.(*netFD).connect(0xc422722480, 0x857a00, 0xc42001c0d8, 0x0, 0x0, 0x853a80, 0xc422764100, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/fd_unix.go:160 +0x2f7
net.(*netFD).dial(0xc422722480, 0x857a00, 0xc42001c0d8, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0xc42276b610, 0x54d3fe)
    /usr/local/go/src/net/sock_posix.go:142 +0xe9
net.socket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x2, 0x1, 0x0, 0x0, 0x858c00, 0x0, ...)
    /usr/local/go/src/net/sock_posix.go:93 +0x1a5
net.internetSocket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0x1, 0x0, ...)
    /usr/local/go/src/net/ipsock_posix.go:141 +0x129
net.doDialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0xc42276b7e0, 0x0, 0xf2)
    /usr/local/go/src/net/tcpsock_posix.go:62 +0xb9
net.dialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0x44a6b8, 0xad3bb25ed8, 0x2e6f67bd)
    /usr/local/go/src/net/tcpsock_posix.go:58 +0xe4
net.dialSingle(0x857a00, 0xc42001c0d8, 0xc422722400, 0x8559c0, 0xc422736510, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:547 +0x3e2
net.dialSerial(0x857a00, 0xc42001c0d8, 0xc422722400, 0xc420010c80, 0x1, 0x1, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:515 +0x247
net.(*Dialer).DialContext(0xc422766240, 0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/dial.go:397 +0x6ee
net.(*Dialer).Dial(0xc422766240, 0x6fee50, 0x3, 0x701517, 0xe, 0x10, 0x6b5de0, 0xc420082001, 0xc420010c40)
    /usr/local/go/src/net/dial.go:320 +0x75
net.(*Dialer).Dial-fm(0x6fee50, 0x3, 0x701517, 0xe, 0x3, 0xc42273cc50, 0x42a568, 0xc42273cc18)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:180 +0x52
github.com/gomodule/redigo/redis.Dial(0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:183 +0x182
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:133 +0x12e
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

rax    0x5
rbx    0xf3e31
rcx    0x5
rdx    0xc42005a180
rdi    0x458f01
rsi    0x3
rbp    0xc42004dfc8
rsp    0xc42004dee8
r8     0x0
r9     0x0
r10    0x732f60
r11    0x30
r12    0x0
r13    0xf1
r14    0x11
r15    0x0
rip    0x65f199
rflags 0x246
cs     0x33
fs     0x0
gs     0x0

Go 例程 22 有点有趣,因为它在 [IO WAIT] 中,而其他例程 运行 可用。这些状态我都没有处理过,请问是这里的问题吗?

最新代码在这里 https://gitlab.com/samisagit/go-im-server/blob/changed-redis-handler-buggy/src/main.go

编辑根本原因!

golang-nuts gGroup 的一个人建议这可能是 GC 问题,他是对的(感谢 Michael)!当运行使用下面的代码

debug.SetGCPercent(-1)

它按预期工作 - 这不是一个长期的解决方案,但它指出了问题所在!如果有人知道为什么 GC 如此热衷于履行其职责,我将不胜感激!

基于此

I can see from other testing I'm running that this isn't just going slowly and that the messages just aren't being taken from the channel.

我认为你在 addRedisSender()

中有一个实时锁定

select 语句将伪随机 select 一种情况,killSwitch 情况或 <-messageChannel 情况。除了还有另一种情况,default。这将永远是正确的,这意味着 for { 循环将永远消耗所有资源并导致活锁,因为 go 运行时试图安排越来越多的竞争 go 例程。

如果删除 default: continue 情况,则 for 循环将阻塞在 select 上,直到有要读取的消息或终止信号。

对于未来的参考,这最终成为 RedisWriteHandler 中的第一个 for 循环。循环中没有阻塞行(例如 select),因此循环运行 'infinitely' 并耗尽大量资源,导致 GC 将其关闭。当我第一次发现这个时,我以为我会把答案贴在这里,但显然不是。