Pika:消费下一条消息,即使上一条消息未被确认
Pika: Consume the next message even the last message was not acknowledged
对于服务器自动化,我们正在尝试开发一种工具,它可以在不同的服务器上处理和执行大量任务。我们将任务和服务器主机名发送到队列中。然后请求者使用队列,将信息提供给 ansible api。为了实现我们可以一次执行多个任务,我们正在使用线程。
现在我们无法确认消息...
到目前为止我们做了什么:
requester.py
消耗队列然后启动一个线程,其中 ansible 任务是 运行。然后将结果发送到另一个队列。因此,每条新消息都会创建一个新线程。任务完成,线程就死了。
但现在来了困难的部分。我们必须使消息持久化,以防我们的服务器死机。因此,每条消息都应在 来自 ansible 的结果发回后得到确认。
我们现在的问题是,当我们尝试在线程本身中确认消息时,没有更多的 "simultaneously" 工作完成,因为 pika 的 consume
等待确认。那么我们如何才能实现 consume
消费消息而不等待确认呢?或者我们如何解决或改进我们的小程序?
requester.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
我们正在使用 Python 2.7 和 pika 0.10.0。
是的,我们在鼠兔常见问题解答中注意到:http://pika.readthedocs.io/en/0.10.0/faq.html
鼠兔不是线程安全的。
禁用自动确认并将预取计数设置为大于 1 的值,具体取决于您希望消费者接收多少条消息。
这是设置预取的方法
channel.basic_qos(prefetch_count=1)
,找到 here。
对于服务器自动化,我们正在尝试开发一种工具,它可以在不同的服务器上处理和执行大量任务。我们将任务和服务器主机名发送到队列中。然后请求者使用队列,将信息提供给 ansible api。为了实现我们可以一次执行多个任务,我们正在使用线程。
现在我们无法确认消息...
到目前为止我们做了什么:
requester.py
消耗队列然后启动一个线程,其中 ansible 任务是 运行。然后将结果发送到另一个队列。因此,每条新消息都会创建一个新线程。任务完成,线程就死了。
但现在来了困难的部分。我们必须使消息持久化,以防我们的服务器死机。因此,每条消息都应在 来自 ansible 的结果发回后得到确认。
我们现在的问题是,当我们尝试在线程本身中确认消息时,没有更多的 "simultaneously" 工作完成,因为 pika 的 consume
等待确认。那么我们如何才能实现 consume
消费消息而不等待确认呢?或者我们如何解决或改进我们的小程序?
requester.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
我们正在使用 Python 2.7 和 pika 0.10.0。
是的,我们在鼠兔常见问题解答中注意到:http://pika.readthedocs.io/en/0.10.0/faq.html
鼠兔不是线程安全的。
禁用自动确认并将预取计数设置为大于 1 的值,具体取决于您希望消费者接收多少条消息。
这是设置预取的方法
channel.basic_qos(prefetch_count=1)
,找到 here。