从线程消耗的鼠兔忽略了 KeyboardInterrupt

pika consuming from thread ignores KeyboardInterrupt

我正在尝试 运行 使用来自 github 的 pika 0.10.0 进行以下测试:

import logging
import sys
import pika
import threading

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
URL = 'amqp://guest:guest@127.0.0.1:5672/%2F?socket_timeout=0.25'


class BaseConsumer(threading.Thread):

    def run(self):
        self._connection = None
        self._channel = None
        self.connect()
        self.open_channel()
        self.consume()

    def connect(self):
        parameters = pika.URLParameters(URL)
        self._connection = pika.BlockingConnection(parameters)

    def open_channel(self):
        self._channel = self._connection.channel()
        self._channel.exchange_declare(exchange='exc1', exchange_type='topic', passive=False,
                                       durable=False, auto_delete=False, internal=False, arguments=None)
        self._channel.queue_declare(queue='test', passive=False, durable=False,
                                    exclusive=False, auto_delete=False, arguments=None)
        self._channel.queue_bind(
            'test', 'exc1', routing_key='rk', arguments=None)

    def consume(self):
        self._channel.basic_consume(self.on_message, 'test')
        try:
            self._channel.start_consuming()
        except KeyboardInterrupt:
            logging.info("Stop consuming now!")
            self._channel.stop_consuming()
        self._connection.close()

    def on_message(self, channel, method_frame, header_frame, body):
        print method_frame.delivery_tag
        print body
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)

c1 = BaseConsumer()
c1.setDaemon(False)
c1.start()

脚本正在连接到我的 MQ,并且显然能够使用来自 MQ 的消息。问题是我无法停止线程。在我的键盘上按 CTRL-C 只会导致“^C”出现在控制台中而不会中断消费。

问题是,如何让 pika 在线程中 运行ning 时停止消耗?我想指出,我正在遵循在消费者线程中创建连接的准则。

如果在使用 c1.start() 启动线程后,我还执行无限 while 循环并从那里记录一些内容,然后按 CTRL-C 将结束 while 循环,但消费者线程仍将忽略任何额外的 CTRL -C.

附带问题:是否可以通过 threading.Condition 之类的外部信号停止线程内部的消费?我看不出如何干扰 start_consuming.

Question: ... from there then pressing CTRL-C will end the while loop

def stop() 添加到您的 BaseConsumer,
抓住 KeyboardInterrupt 并调用 stop().

try: 
    BaseConsumer.run() 
except KeyboardInterrupt: 
    BaseConsumer.stop()

阅读pika: asynchronous_consumer_example