运行 带有 Quart 的 RabbitMQ 鼠兔
Running RabbitMQ Pika with Quart
我正在使用 Quart 框架,但我还需要使用 RabbitMQ Pika 连接器,但我无法让它们很好地发挥作用,因为它们都有无限循环。
入口点:
from quart import Quart
from .service import Service
app = Quart(__name__)
@app.before_serving
async def startup():
app.service_task = asyncio.ensure_future(service.start())
if not service.initialise():
sys.exit()
服务Class:
class Service:
def __init__(self, new_instance):
self._connection = None
self._channel = None
self._messaging_thread = None
def initialise(self):
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
self._connection = pika.BlockingConnection(parameters)
self._channel = self._connection.channel()
self._channel.queue_declare(queue='to_be_processed_queue')
self._channel.basic_consume(queue='to_be_processed_queue',
auto_ack=True,
on_message_callback=self.callback)
print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()
print('Thread created...')
def run_consume(self):
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._shutdown()
代码甚至没有到达 print('Thread created...'),我不明白。从 this question 我知道 RabbitMQ 不是线程安全的,但我不明白 运行 RabbitMQ 的其他方式。
正如您已经发现的那样,Pika 不是线程安全的,但这不是您的程序阻塞的原因。
您的问题可能出在这里:
print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()
如果从 run_consume 中删除括号,它会不会更好?现在你实际上不是在创建一个线程,而是当场执行 self.run_consume()
并且它不会退出。
self._messaging_thread = Thread(target=self.run_consume)
这将是我的第一次尝试。
但是,由于 Pika 不是线程安全的,您还必须将频道创建和内容移至线程,而不是在主程序中执行。如果您不在其他任何地方使用它,它可能会起作用,但使用 Pika 的正确方法是绝对包含线程中的所有内容,而不是像您现在在这里所做的那样在线程之间共享任何 Pika 结构。
我正在使用 Quart 框架,但我还需要使用 RabbitMQ Pika 连接器,但我无法让它们很好地发挥作用,因为它们都有无限循环。
入口点:
from quart import Quart
from .service import Service
app = Quart(__name__)
@app.before_serving
async def startup():
app.service_task = asyncio.ensure_future(service.start())
if not service.initialise():
sys.exit()
服务Class:
class Service:
def __init__(self, new_instance):
self._connection = None
self._channel = None
self._messaging_thread = None
def initialise(self):
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
self._connection = pika.BlockingConnection(parameters)
self._channel = self._connection.channel()
self._channel.queue_declare(queue='to_be_processed_queue')
self._channel.basic_consume(queue='to_be_processed_queue',
auto_ack=True,
on_message_callback=self.callback)
print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()
print('Thread created...')
def run_consume(self):
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._shutdown()
代码甚至没有到达 print('Thread created...'),我不明白。从 this question 我知道 RabbitMQ 不是线程安全的,但我不明白 运行 RabbitMQ 的其他方式。
正如您已经发现的那样,Pika 不是线程安全的,但这不是您的程序阻塞的原因。
您的问题可能出在这里:
print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()
如果从 run_consume 中删除括号,它会不会更好?现在你实际上不是在创建一个线程,而是当场执行 self.run_consume()
并且它不会退出。
self._messaging_thread = Thread(target=self.run_consume)
这将是我的第一次尝试。
但是,由于 Pika 不是线程安全的,您还必须将频道创建和内容移至线程,而不是在主程序中执行。如果您不在其他任何地方使用它,它可能会起作用,但使用 Pika 的正确方法是绝对包含线程中的所有内容,而不是像您现在在这里所做的那样在线程之间共享任何 Pika 结构。