RabbitMQ 和 python queue.Queue.get() 被卡住了

RabbitMQ and python queue.Queue.get() is stuck

我正在尝试将即将到来的数据复制到另一个 queue.Queue() 以在另一个线程中做其他事情。

def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)

和此处的配置行

def start_rgb_consume_from_rabbitmq():
try:

    #       RABBITMQ PART       #

    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    # connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
    rgb_channel = connection.channel()
    rgb_channel.queue_declare(queue=RGB_QUEUE)
    rgb_channel.queue_purge(queue=RGB_QUEUE)
    rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)

    rgb_channel.start_consuming()

except Exception as err:
    print("Exception :", err)
    rgb_channel.stop_consuming()

except KeyboardInterrupt:
    rgb_channel.stop_consuming()
    sys.exit(0)

最后是我失败的 queue.Queue().get() 函数:

def rgb_data_read_from_python_queue():
if STATUS2:
    cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
    print("POINTER 1")
    try:
        rgb_frame = READ_QUEUE.get(block=True)
    except queue.Empty:
        rgb_frame = None

    if not rgb_frame:
        continue

    print("POINTER 2")

它卡在那里。我是线程和队列架构方面的新手。我正在尝试 add_callbak_threadsafe() 并且我知道 get() 会阻塞线程。但是我在这里创建了 2 个不同的线程

rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()

所以如果我创建了 2 个线程,为什么 queue.Queue().get() 会阻塞另一个线程。感谢您的帮助。我可以分享整个代码,它非常简单,将近 170 行。

我在这里解决了这个问题,我想为那些试图将数据放入 rabbitmq 队列并由消费者读取然后将其放入 python 队列并在其他线程上做一些事情的人发布.我希望它能帮助别人。

#       RGB CONSUME     #

import numpy as np
import pika
import sys
import cv2
import queue
import threading

#       MACRO DEFINITIONS       #

RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False


#       PARAMETER CHECK      #

# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
    if sys.argv[1] == '-display':
        STATUS = False
        STATUS2 = True
    else:
        print("Gecersiz parametre")
        exit(1)
else:
    pass


#       LOCAL FUNCTIONS     #

def rgb_callback(ch, method, properties, body):
    rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
    READ_QUEUE.put(item=rgb_color_bytes, block=True)
    print(rgb_color_bytes)


def start_rgb_consume_from_rabbitmq():
    try:

        #       RABBITMQ PART       #

        connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
        rgb_channel = connection.channel()
        rgb_channel.queue_declare(queue=RGB_QUEUE)
        rgb_channel.queue_purge(queue=RGB_QUEUE)
        rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
        if STATUS:
            cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)

        if STATUS or STATUS2:
            print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
        else:
            print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')

        rgb_channel.start_consuming()

    except Exception as err:
        print("Exception :", err)
        rgb_channel.stop_consuming()

    except KeyboardInterrupt:
        print('Interrupted ^^ Channel Kapatildi')
        rgb_channel.stop_consuming()
        sys.exit(0)


def rgb_data_read_from_python_queue():
    if STATUS2:
        cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)

    while True:
        rgb_frame = READ_QUEUE.get(block=True)
        # 640 * 480
        if rgb_frame.size == 921600:
            rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
        # 1280 * 720
        elif rgb_frame.size == 2764800:
            rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
        # 1920 * 1080
        elif rgb_frame.size == 6220800:
            rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
        else:
            print("Something wrong i can feel it")
            exit(1)

        if STATUS2:
            cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
            cv2.waitKey(1)


try:
    rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
    consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
    rgb_data_thread.start()
    consumer_thread.start()
except KeyboardInterrupt:
    print('Interrupted')
    cv2.destroyAllWindows()
    sys.exit(0)