RabbitMQ Unack'ed 消息未重新排队

RabbitMQ Unack'ed messages not getting requeued

我在 运行 RabbitMQ 消费者中遇到一个问题很长时间了。我的一些消息最终处于未确认状态。

我的RabbitMQ版本:3.6.15 鼠兔版本:0.11.0b

import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<SERVER>"
queue = "<QUEUE>"
connection = None

def check_acknowledge(channel, connection, ack_queue):
    delivery_tag = None
    while(True):
        try:
            delivery_tag = ack_queue.get_nowait()
            channel.basic_nack(delivery_tag=delivery_tag)
            break
        except:
            connection.process_data_events()
        time.sleep(1)


def process_message(body, delivery_tag, ack_queue):
    print "Received %s" % (body)
    print "Waiting for 600 seconds before receiving next ID\n"
    start = time.time()
    elapsed = 0
    while elapsed < 10:
        elapsed = time.time() - start
        print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
        time.sleep(1)
    ack_queue.put(delivery_tag)




def callback(ch, method, properties, body):
    global connection
    ack_queue = Queue()
    t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
    t.start()
    check_acknowledge(ch, connection, ack_queue)

while True:
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
        channel = connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue=queue)
        channel.start_consuming()
    except KeyboardInterrupt:
        break

channel.close()
connection.close()
exit(0)

我是不是漏掉了什么?

我使用了以下多线程消费者来解决这个问题。

import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<RABBITMQ_SERVER_IP>"
queue = "hello1"
connection = None




def check_acknowledge(channel, connection, ack_queue):
    delivery_tag = None
    while(True):
        try:
            delivery_tag = ack_queue.get_nowait()
            channel.basic_ack(delivery_tag=delivery_tag)
            break
        except:
            connection.process_data_events()
        time.sleep(1)


def process_message(body, delivery_tag, ack_queue):
    print "Received %s" % (body)
    print "Waiting for 600 seconds before receiving next ID\n"
    start = time.time()
    elapsed = 0
    while elapsed < 300:
        elapsed = time.time() - start
        print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
        time.sleep(1)
    ack_queue.put(delivery_tag)




def callback(ch, method, properties, body):
    global connection
    ack_queue = Queue()
    t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
    t.start()
    check_acknowledge(ch, connection, ack_queue)

while True:
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
        channel = connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue=queue)
        channel.start_consuming()
    except KeyboardInterrupt:
        break

channel.close()
connection.close()
exit(0)
  1. 消费者callback 函数在主线程本身触发一个单独的函数check_acknowledge。因此,连接和通道对象保留在同一个线程中。请注意,Pika 不是线程安全的,因此我们需要在同一个线程中维护这些对象。
  2. 实际处理发生在从主线程派生的新线程中。
  3. process_message 完成处理后,它会将 delivery_tag 放入队列。

  4. check_acknowledge 无限循环,直到找到 process_message 放入队列的 delivery_tag。一旦找到,它就会 acks 消息和 returns.

我已经通过 运行 这个消费者 sleep 测试了这个实现 5 分钟、10 分钟、30 分钟和一个小时。这对我来说效果很好。