Python 和 RabbitMQ - 收听来自多个渠道的消费事件的最佳方式?
Python and RabbitMQ - Best way to listen to consume events from multiple channels?
我有两个独立的 RabbitMQ 实例。我正在尝试找到最好的方式来收听来自两者的事件。
例如,我可以使用以下方式消费事件:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
我有第二位主持人,"host2",我也想听听。我考虑过创建两个单独的线程来执行此操作,但据我所知,鼠兔不是线程安全的。有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的 Rabbit 实例(host1 和 host2)就足够了吗?
"what is the best way" 的答案在很大程度上取决于您的队列使用模式以及 "best" 的含义。由于我还不能对问题发表评论,我只会尝试提出一些可能的解决方案。
在每个示例中,我都假设交换已经声明。
线程
您可以使用 pika
在单个进程中使用来自不同主机上的两个队列的消息。
你是对的 - its own FAQ states, pika
is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading
模块如下所示:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
我已将 callback_func
声明为一种纯粹用于在打印邮件正文时使用 ConsumerThread.name
的方法。它也可能是 ConsumerThread
class.
之外的函数
进程
或者,您始终可以只 运行 一个进程,每个队列都包含您要消费事件的消费者代码。
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
然后 运行 通过:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
如果您对来自队列的消息所做的工作是 CPU-heavy,并且只要 CPU 中的核心数量 >= 消费者数量,通常最好使用此方法- 除非你的队列大部分时间都是空的并且消费者不会利用这个 CPU 时间*。
异步
另一个选择是涉及一些异步框架(例如Twisted
)和运行在单线程中完成整个事情。
您不能再在异步代码中使用 BlockingConnection
;幸运的是,pika
有适配器 Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
这种方法会更好,您消耗的队列越多,消费者执行的工作受到的CPU约束就越少*。
Python 3
自从您提到 pika
,我将自己限制在基于 Python 2.x 的解决方案,因为 pika
尚未移植。
但如果您想移动到 >=3.3,一个可能的选择是使用 asyncio
with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp
or aioamqp
.
* - 请注意,这些都是非常肤浅的提示 - 在大多数情况下,选择并不那么明显;什么对你最好取决于队列 "saturation" (messages/time),你在收到这些消息后做什么工作,你 运行 你的消费者在什么环境等;除了对所有实施进行基准测试外,没有其他方法可以确定
下面是我如何使用一个rabbitmq实例同时监听2个队列的例子:
import pika
import threading
threads=[]
def client_info(channel):
channel.queue_declare(queue='proxy-python')
print (' [*] Waiting for client messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='proxy-python', no_ack=True)
channel.start_consuming()
def scenario_info(channel):
channel.queue_declare(queue='savi-virnet-python')
print (' [*] Waiting for scenrio messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
channel.start_consuming()
def manager():
connection1= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel1 = connection1.channel()
connection2= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel2 = connection2.channel()
t1 = threading.Thread(target=client_info, args=(channel1,))
t1.daemon = True
threads.append(t1)
t1.start()
t2 = threading.Thread(target=scenario_info, args=(channel2,))
t2.daemon = True
threads.append(t2)
t2.start()
for t in threads:
t.join()
manager()
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import connect_robust, Message
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
QUEUE = asyncio.Queue()
class SubscriberHandler(tornado.web.RequestHandler):
async def get(self):
message = await QUEUE.get()
self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self):
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
print('ok')
self.finish("OK")
async def make_app():
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
await queue.consume(QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
if __name__ == "__main__":
app = io_loop.asyncio_loop.run_until_complete(make_app())
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
你可以异步使用aio-pika
更多例子在这里
https://buildmedia.readthedocs.org/media/pdf/aio-pika/latest/aio-pika.pdf
编码愉快:)
Pika 可以用于多线程消费者。唯一的要求是有一个 Pika connection per thread.
Pika Github 存储库有 an example here.
来自 basic_consumer_threaded.py 的片段:
def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(conn, ch, delivery_tag, body))
t.start()
thrds.append(t)
threads = []
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume('standard', on_message_callback)
我有两个独立的 RabbitMQ 实例。我正在尝试找到最好的方式来收听来自两者的事件。
例如,我可以使用以下方式消费事件:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
我有第二位主持人,"host2",我也想听听。我考虑过创建两个单独的线程来执行此操作,但据我所知,鼠兔不是线程安全的。有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的 Rabbit 实例(host1 和 host2)就足够了吗?
"what is the best way" 的答案在很大程度上取决于您的队列使用模式以及 "best" 的含义。由于我还不能对问题发表评论,我只会尝试提出一些可能的解决方案。
在每个示例中,我都假设交换已经声明。
线程
您可以使用 pika
在单个进程中使用来自不同主机上的两个队列的消息。
你是对的 - its own FAQ states, pika
is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading
模块如下所示:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
我已将 callback_func
声明为一种纯粹用于在打印邮件正文时使用 ConsumerThread.name
的方法。它也可能是 ConsumerThread
class.
进程
或者,您始终可以只 运行 一个进程,每个队列都包含您要消费事件的消费者代码。
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
然后 运行 通过:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
如果您对来自队列的消息所做的工作是 CPU-heavy,并且只要 CPU 中的核心数量 >= 消费者数量,通常最好使用此方法- 除非你的队列大部分时间都是空的并且消费者不会利用这个 CPU 时间*。
异步
另一个选择是涉及一些异步框架(例如Twisted
)和运行在单线程中完成整个事情。
您不能再在异步代码中使用 BlockingConnection
;幸运的是,pika
有适配器 Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
这种方法会更好,您消耗的队列越多,消费者执行的工作受到的CPU约束就越少*。
Python 3
自从您提到 pika
,我将自己限制在基于 Python 2.x 的解决方案,因为 pika
尚未移植。
但如果您想移动到 >=3.3,一个可能的选择是使用 asyncio
with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp
or aioamqp
.
* - 请注意,这些都是非常肤浅的提示 - 在大多数情况下,选择并不那么明显;什么对你最好取决于队列 "saturation" (messages/time),你在收到这些消息后做什么工作,你 运行 你的消费者在什么环境等;除了对所有实施进行基准测试外,没有其他方法可以确定
下面是我如何使用一个rabbitmq实例同时监听2个队列的例子:
import pika
import threading
threads=[]
def client_info(channel):
channel.queue_declare(queue='proxy-python')
print (' [*] Waiting for client messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='proxy-python', no_ack=True)
channel.start_consuming()
def scenario_info(channel):
channel.queue_declare(queue='savi-virnet-python')
print (' [*] Waiting for scenrio messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
channel.start_consuming()
def manager():
connection1= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel1 = connection1.channel()
connection2= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel2 = connection2.channel()
t1 = threading.Thread(target=client_info, args=(channel1,))
t1.daemon = True
threads.append(t1)
t1.start()
t2 = threading.Thread(target=scenario_info, args=(channel2,))
t2.daemon = True
threads.append(t2)
t2.start()
for t in threads:
t.join()
manager()
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import connect_robust, Message
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
QUEUE = asyncio.Queue()
class SubscriberHandler(tornado.web.RequestHandler):
async def get(self):
message = await QUEUE.get()
self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self):
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
print('ok')
self.finish("OK")
async def make_app():
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
await queue.consume(QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
if __name__ == "__main__":
app = io_loop.asyncio_loop.run_until_complete(make_app())
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
你可以异步使用aio-pika 更多例子在这里 https://buildmedia.readthedocs.org/media/pdf/aio-pika/latest/aio-pika.pdf
编码愉快:)
Pika 可以用于多线程消费者。唯一的要求是有一个 Pika connection per thread.
Pika Github 存储库有 an example here.
来自 basic_consumer_threaded.py 的片段:
def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(conn, ch, delivery_tag, body))
t.start()
thrds.append(t)
threads = []
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume('standard', on_message_callback)