RabbitMQ 队列长度始终为 0
RabbitMQ Queue Length is always 0
我正在编写一个应用程序,但我遇到了这个问题,一遍又一遍地查看代码,似乎没有任何问题,使用下面的基本代码段进行了测试,问题是可重现的....RabbitMQ 说队列是没有的时候总是空的。
下面的 Golang 代码片段显示生产者发送消息的频率高于消费者使用消息的频率。消费者始终处于活动状态,但睡眠时间更长,以使队列在其积压中有消息。结果?消费者每次尝试时都会获取消息,但是 API 总是说没有消息 -> 消息计数为 0。
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"io/ioutil"
"net/http"
"testing"
"time"
)
func main() {
username := "guest"
password := "guest"
scheme := "amqp"
rabbitMqHost := "localhost"
port := "5672"
connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)
conn, err := amqp.Dial(connectionString)
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
exchangeName := "my-exchange"
// Declare exchange
err = ch.ExchangeDeclare(
exchangeName, // name
"fanout", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Create first Queue
queueName := "my-queue"
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Bind Exchange to Queue
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeName, // exchange
false,
nil,
)
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count++
if err != nil {
panic(err)
}
time.Sleep(time.Second * 2)
}
}()
for {
httpRes, err := http.Get("http://guest:guest@localhost:15672/api/queues/%2f/my-queue")
if err != nil {
panic(err)
}
var resJson map[string]interface{}
content, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
panic(err)
}
httpRes.Body.Close()
err = json.Unmarshal(content, &resJson)
if err != nil {
panic(err)
}
q2, err := ch.QueueDeclarePassive(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil,
)
fmt.Printf("Queue Len: %f - %d\n", resJson["messages"], q2.Messages)
time.Sleep(time.Second)
}
}
您可以使用以下 RabbitMQ 服务器进行测试:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
输出:
Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....
Que Len 没有一次说它不是 0。在我的应用程序中,当我放入一堆消息时,我可能会看到它说 X,但很快它就变成了 0,我以为我有一个隐藏的消费者应用程序,但不,API 给出了一些不准确的结果,或者我应该在其他地方寻找长度。
更新
上面只有在有消费者时才会发生,如果队列没有消费者,它会按预期工作,只需注释.Consume
代码:
/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/
现在它“有所改善”,但首先,它不是我要找的,其次,它仍然是奇怪的输出=S:
Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11
字段 q2.Messages
不可靠,它是 未等待确认的消息计数 ,即已经确认的消息。
您的消费者声明为 autoAck = true
— 即 noAck
—,这意味着不需要确认,这意味着已经有零条消息已确认。
当您注释掉消费者时,确认消息的数量可能取决于发布者缓冲区。
使用 AMQP 0.9.1 在给定队列上以编程方式获取精确数量的消息基本上是不可能的。您可以使用管理 API 中的 message_stats
字段代替:
http://localhost:15672/api/queues/vhost/queue_name
接受的解决方案将是 blackgreen 的。证明是下面的替换,只需将问题部分中的消费者和发布者代码替换为:
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack <-- Difference
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
err = ch.Ack(a.DeliveryTag, false) // <-- Difference
if err != nil {
panic(err)
}
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count++
if err != nil {
panic(err)
}
if count >= 20 { // <-- Difference
break
}
time.Sleep(time.Second * 2)
}
}()
输出:
.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0
我正在编写一个应用程序,但我遇到了这个问题,一遍又一遍地查看代码,似乎没有任何问题,使用下面的基本代码段进行了测试,问题是可重现的....RabbitMQ 说队列是没有的时候总是空的。
下面的 Golang 代码片段显示生产者发送消息的频率高于消费者使用消息的频率。消费者始终处于活动状态,但睡眠时间更长,以使队列在其积压中有消息。结果?消费者每次尝试时都会获取消息,但是 API 总是说没有消息 -> 消息计数为 0。
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"io/ioutil"
"net/http"
"testing"
"time"
)
func main() {
username := "guest"
password := "guest"
scheme := "amqp"
rabbitMqHost := "localhost"
port := "5672"
connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)
conn, err := amqp.Dial(connectionString)
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
exchangeName := "my-exchange"
// Declare exchange
err = ch.ExchangeDeclare(
exchangeName, // name
"fanout", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Create first Queue
queueName := "my-queue"
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Bind Exchange to Queue
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeName, // exchange
false,
nil,
)
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count++
if err != nil {
panic(err)
}
time.Sleep(time.Second * 2)
}
}()
for {
httpRes, err := http.Get("http://guest:guest@localhost:15672/api/queues/%2f/my-queue")
if err != nil {
panic(err)
}
var resJson map[string]interface{}
content, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
panic(err)
}
httpRes.Body.Close()
err = json.Unmarshal(content, &resJson)
if err != nil {
panic(err)
}
q2, err := ch.QueueDeclarePassive(
queueName, // name
true, // durable
true, // delete when unsused
false, // exclusive
false, // no-wait
nil,
)
fmt.Printf("Queue Len: %f - %d\n", resJson["messages"], q2.Messages)
time.Sleep(time.Second)
}
}
您可以使用以下 RabbitMQ 服务器进行测试:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
输出:
Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....
Que Len 没有一次说它不是 0。在我的应用程序中,当我放入一堆消息时,我可能会看到它说 X,但很快它就变成了 0,我以为我有一个隐藏的消费者应用程序,但不,API 给出了一些不准确的结果,或者我应该在其他地方寻找长度。
更新
上面只有在有消费者时才会发生,如果队列没有消费者,它会按预期工作,只需注释.Consume
代码:
/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/
现在它“有所改善”,但首先,它不是我要找的,其次,它仍然是奇怪的输出=S:
Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11
字段 q2.Messages
不可靠,它是 未等待确认的消息计数 ,即已经确认的消息。
您的消费者声明为 autoAck = true
— 即 noAck
—,这意味着不需要确认,这意味着已经有零条消息已确认。
当您注释掉消费者时,确认消息的数量可能取决于发布者缓冲区。
使用 AMQP 0.9.1 在给定队列上以编程方式获取精确数量的消息基本上是不可能的。您可以使用管理 API 中的 message_stats
字段代替:
http://localhost:15672/api/queues/vhost/queue_name
接受的解决方案将是 blackgreen 的。证明是下面的替换,只需将问题部分中的消费者和发布者代码替换为:
// Listen
eventQueue, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack <-- Difference
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
go func() {
for a := range eventQueue {
err = ch.Ack(a.DeliveryTag, false) // <-- Difference
if err != nil {
panic(err)
}
fmt.Printf("Received Event %s\n", string(a.Body))
time.Sleep(time.Second * 4)
}
}()
go func() {
count := 0
for {
err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("Message %d", count)),
})
fmt.Printf("Sent Message %d\n", count)
count++
if err != nil {
panic(err)
}
if count >= 20 { // <-- Difference
break
}
time.Sleep(time.Second * 2)
}
}()
输出:
.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0