RabbitMQ/Pika 使用 evenlet 阻止消费者
RabbitMQ/Pika blocking consumer with evenlet
我正在开发一个需要 eventlet 的大型应用程序,现在也需要 rabbitMQ。看起来 eventlet 导致 pika 消费者线程阻止其他工作人员的执行。我知道 Pika 不被认为是线程安全的,所以我在它自己的线程中拥有一切,包括连接。我假设阻塞连接应该只阻塞消费者线程。我怎样才能让 pika 和 eventlet 一起工作?在下面的示例中,工作线程从不打印任何内容,但注释掉 eventlet.monkey_patch()
允许两个线程执行。
import threading
import pika
import eventlet
eventlet.monkey_patch()
def callback(ch, method, properties, body):
print body
ch.basic_ack(delivery_tag=method.delivery_tag)
def consumer():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test', durable=True,
exclusive=False, auto_delete=False)
channel.basic_consume(callback, queue='test')
channel.start_consuming()
def start_consumer_thread():
# initialize a listener thread
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
def worker():
start_consumer_thread()
for x in range(1,10000):
print x
x = threading.Thread(target=worker())
x.start()
Pika 和 eventlet.monkey_patch
不兼容。如果可能的话,您将不得不使用 eventlet
而不修补系统调用。
注意: RabbitMQ 团队监控 the rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我能够尽早通过猴子补丁让 pika
消费者使用 eventlet
,并以补丁方式导入 pika
。
首先导入并修补标准库:
import eventlet
eventlet.monkey_patch()
然后导入并修补 pika
本身:
pika = eventlet.import_patched('pika')
我将此导入策略与 asynchronous_consumer_example 结合使用:https://pika.readthedocs.io/en/stable/examples/asynchronous_consumer_example.html 并使用 eventlet
原语而不是 threading
来实现非阻塞消费者。
我正在开发一个需要 eventlet 的大型应用程序,现在也需要 rabbitMQ。看起来 eventlet 导致 pika 消费者线程阻止其他工作人员的执行。我知道 Pika 不被认为是线程安全的,所以我在它自己的线程中拥有一切,包括连接。我假设阻塞连接应该只阻塞消费者线程。我怎样才能让 pika 和 eventlet 一起工作?在下面的示例中,工作线程从不打印任何内容,但注释掉 eventlet.monkey_patch()
允许两个线程执行。
import threading
import pika
import eventlet
eventlet.monkey_patch()
def callback(ch, method, properties, body):
print body
ch.basic_ack(delivery_tag=method.delivery_tag)
def consumer():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test', durable=True,
exclusive=False, auto_delete=False)
channel.basic_consume(callback, queue='test')
channel.start_consuming()
def start_consumer_thread():
# initialize a listener thread
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
def worker():
start_consumer_thread()
for x in range(1,10000):
print x
x = threading.Thread(target=worker())
x.start()
Pika 和 eventlet.monkey_patch
不兼容。如果可能的话,您将不得不使用 eventlet
而不修补系统调用。
注意: RabbitMQ 团队监控 the rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我能够尽早通过猴子补丁让 pika
消费者使用 eventlet
,并以补丁方式导入 pika
。
首先导入并修补标准库:
import eventlet
eventlet.monkey_patch()
然后导入并修补 pika
本身:
pika = eventlet.import_patched('pika')
我将此导入策略与 asynchronous_consumer_example 结合使用:https://pika.readthedocs.io/en/stable/examples/asynchronous_consumer_example.html 并使用 eventlet
原语而不是 threading
来实现非阻塞消费者。