跨多个线程使用 Pika Channel

Using a Pika Channel across multiple threads

虽然很明显不能跨多线程使用单个PIKA连接,但我们是否可以使用跨多线程连接的channel got。我这样做时遇到了这个错误,答案似乎是否定的。仅供参考

INFO  2019-02-07 13:14:12,927 pika.connection _on_terminate  2095: Disconnected from RabbitMQ at 127.0.0.1:5672 (505): UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

Though it is clear not to use a single PIKA connection across multiple threads, can we use the channel got of the connection across multiple threads.

我是 Pika 的维护者之一,不,你不能跨线程使用连接或通道。这已记录在案。


注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。

我这样做如下

Example using PIKA consumer without blocking thread  - PIKA and GRPC Streaming

###########
    def grpc_test(self, request, context): 
    # A response streaming GRPC implementation - Client gets stream of messages

        message_queue = Queue.Queue()
        app = request
        def rmq_callback(data):
            print("Got a call back from RMQ Client")
            message_queue.put(data)

        # Register with RabbitMQ for Data
        # thread safe - create a connection here and a channel
        pikaconsumer = TestConsumer()
        # Client want to listen on this queue
        pikaconsumer.listen_on_queue("xxxx", rmq_callback) 
        # use the connection and channel in a new thread (and no other thread)
        t= threading.Thread(target=pikaconsumer.start_consuming)
        t.start()

        while True:
              date = message_queue.get(True)
              protobuf_obj = proto.Data()
              message.ParseFromString(obj)
              yield message

###########

class TestConsumer(object):

    def __init__(self):
        amqp_url ='amqp://guest:guest@127.0.0.1:5672/'
        parameters = pika.URLParameters(amqp_url)
        connection = pika.BlockingConnection(parameters)
        self._channel = connection.channel()


    def listen_on_queue(self,queue_name,exchange,routing_keys,_callback):
        # In case queue is  not there - create a queue
        self._channel.queue_declare(queue=queue_name,auto_delete=True,)
        for routing_key in routing_keys:
            self._channel.queue_bind(queue_name,
                                 exchange, str(routing_key))
            LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]',
                    exchange, queue_name, str(routing_key))

        def __on_message(channel, method_frame, header_frame, body, callback=()):
            print(method_frame.delivery_tag)
            callback(body)
            channel.basic_ack(delivery_tag=method_frame.delivery_tag)
        self._consumer_tag = self._channel.basic_consume(partial(__on_message,
                    callback=_callback), queue_name)

    def start_consuming(self):
        self._channel.start_consuming()