使用 Pika BlockingConnection 时,basic_ack() 是否必须放在回调函数中

when using Pika BlockingConnection, Does basic_ack() has to be placed in callback function

说我已经建立了如下所示的 RabbitMQ 连接:

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
    callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )

为了实现更好的并发性,我尝试将每个作业放在一个内部队列中,并创建了一个 while 循环来为从该内部队列中检索到的每个作业异步调度工作人员:

from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool

task_queue = Queue(10)
pool = Pool(20)

def worker(ch, method, job):
    # ...some heavy lifting...
     if job_gets_done:         # some abstraction
        print "job success"
        ch.basic_ack(delivery_tag=method.delivery_tag)   # PROBLEM : this seems not working
     else:
        print "job failed"

def callback(ch, method, properties, job):
     task_queue.put((ch,method,dn))     # put job in internal queue, block if full.

@threaded
def async_process_jobs():              # loop to get job and start thread worker.
    while True:
         params = task_queue.get()
         pool.apply_async( worker, params )   # param = (ch,method, job)


async_process_jobs()
channel.start_consuming()

问题是,当处理作业时,没有一个正确地发送确认(即使执行流真的通过它,即打印 "job success")。 rabbitmq 上的队列大小保持不变,为什么?

somewhat official tutorial 中,basic_ack() 被放置在 callback() 中,但我的没有。这可能是问题的根源吗?


详细行为(可能不重要):假设我在队列中有 10000 个作业,开始时,大约 2000 条消息进入 Unacked 状态,然后所有消息都回到就绪状态,即使我的工人仍在处理和打印 "job succes"(acking).

来自FAQ of pika

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.

我遇到了类似的问题,我注意到: 如果工作完成得很快,那么 ack 就可以工作 但是如果这个工作花费更多的时间,那么 ack 就不起作用了,即使它发出了。