RabbitMQ 队列长度始终为 0

RabbitMQ Queue Length is always 0

我正在编写一个应用程序,但我遇到了这个问题,一遍又一遍地查看代码,似乎没有任何问题,使用下面的基本代码段进行了测试,问题是可重现的....RabbitMQ 说队列是没有的时候总是空的。

下面的 Golang 代码片段显示生产者发送消息的频率高于消费者使用消息的频率。消费者始终处于活动状态,但睡眠时间更长,以使队列在其积压中有消息。结果?消费者每次尝试时都会获取消息,但是 API 总是说没有消息 -> 消息计数为 0。

package main

import (
    "encoding/json"
    "fmt"
    "github.com/streadway/amqp"
    "io/ioutil"
    "net/http"
    "testing"
    "time"
)
func main() {

    username := "guest"
    password := "guest"
    scheme := "amqp"
    rabbitMqHost := "localhost"
    port := "5672"

    connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)

    conn, err := amqp.Dial(connectionString)

    if err != nil {
        panic(err)
    }

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }

    exchangeName := "my-exchange"
    // Declare exchange
    err = ch.ExchangeDeclare(
        exchangeName, // name
        "fanout",     // type
        true,         // durable
        true,         // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )

    if err != nil {
        panic(err)
    }

    // Create first Queue
    queueName := "my-queue"
    q, err := ch.QueueDeclare(
        queueName, // name
        true,      // durable
        true,      // delete when unsused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )

    if err != nil {
        panic(err)
    }

    // Bind Exchange to Queue
    err = ch.QueueBind(
        q.Name,       // queue name
        "",           // routing key
        exchangeName, // exchange
        false,
        nil,
    )

    // Listen
    eventQueue, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil {
        panic(err)
    }

    go func() {
        for a := range eventQueue {
            fmt.Printf("Received Event %s\n", string(a.Body))
            time.Sleep(time.Second * 4)
        }
    }()

    go func() {
        count := 0
        for {
            err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
                ContentType: "application/json",
                Body:        []byte(fmt.Sprintf("Message %d", count)),
            })

            fmt.Printf("Sent Message %d\n", count)
            count++
            if err != nil {
                panic(err)
            }
            time.Sleep(time.Second * 2)
        }
    }()

    for {
        httpRes, err := http.Get("http://guest:guest@localhost:15672/api/queues/%2f/my-queue")
        if err != nil {
            panic(err)
        }

        var resJson map[string]interface{}
        content, err := ioutil.ReadAll(httpRes.Body)
        if err != nil {
            panic(err)
        }
        httpRes.Body.Close()
        err = json.Unmarshal(content, &resJson)

        if err != nil {
            panic(err)
        }

        q2, err := ch.QueueDeclarePassive(
            queueName, // name
            true,      // durable
            true,      // delete when unsused
            false,     // exclusive
            false,     // no-wait
            nil,
        )
        fmt.Printf("Queue Len: %f - %d\n", resJson["messages"], q2.Messages)
        time.Sleep(time.Second)
    }

}

您可以使用以下 RabbitMQ 服务器进行测试:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

输出:

Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....

Que Len 没有一次说它不是 0。在我的应用程序中,当我放入一堆消息时,我可能会看到它说 X,但很快它就变成了 0,我以为我有一个隐藏的消费者应用程序,但不,API 给出了一些不准确的结果,或者我应该在其他地方寻找长度。

更新

上面只有在有消费者时才会发生,如果队列没有消费者,它会按预期工作,只需注释.Consume代码:

/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/

现在它“有所改善”,但首先,它不是我要找的,其次,它仍然是奇怪的输出=S:

Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11

字段 q2.Messages 不可靠,它是 未等待确认的消息计数 ,即已经确认的消息。

您的消费者声明为 autoAck = true — 即 noAck —,这意味着不需要确认,这意味着已经有零条消息已确认。

当您注释掉消费者时,确认消息的数量可能取决于发布者缓冲区。

使用 AMQP 0.9.1 在给定队列上以编程方式获取精确数量的消息基本上是不可能的。您可以使用管理 API 中的 message_stats 字段代替:

http://localhost:15672/api/queues/vhost/queue_name

接受的解决方案将是 blackgreen 的。证明是下面的替换,只需将问题部分中的消费者和发布者代码替换为:

// Listen
    eventQueue, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack <-- Difference
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil {
        panic(err)
    }

    go func() {

        for a := range eventQueue {
            err = ch.Ack(a.DeliveryTag, false) // <-- Difference
            if err != nil {
                panic(err)
            }
            fmt.Printf("Received Event %s\n", string(a.Body))
            time.Sleep(time.Second * 4)
        }
    }()

    go func() {
        count := 0
        for {
            err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
                ContentType: "application/json",
                Body:        []byte(fmt.Sprintf("Message %d", count)),
            })

            fmt.Printf("Sent Message %d\n", count)
            count++
            if err != nil {
                panic(err)
            }
            if count >= 20 { // <-- Difference
                break
            }
            time.Sleep(time.Second * 2)
        }
    }()

输出:

.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0