RabbitMQ 和 python queue.Queue.get() 被卡住了
RabbitMQ and python queue.Queue.get() is stuck
我正在尝试将即将到来的数据复制到另一个 queue.Queue() 以在另一个线程中做其他事情。
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
和此处的配置行
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
# connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
rgb_channel.stop_consuming()
sys.exit(0)
最后是我失败的 queue.Queue().get() 函数:
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
print("POINTER 1")
try:
rgb_frame = READ_QUEUE.get(block=True)
except queue.Empty:
rgb_frame = None
if not rgb_frame:
continue
print("POINTER 2")
它卡在那里。我是线程和队列架构方面的新手。我正在尝试 add_callbak_threadsafe()
并且我知道 get() 会阻塞线程。但是我在这里创建了 2 个不同的线程
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
所以如果我创建了 2 个线程,为什么 queue.Queue().get() 会阻塞另一个线程。感谢您的帮助。我可以分享整个代码,它非常简单,将近 170 行。
我在这里解决了这个问题,我想为那些试图将数据放入 rabbitmq 队列并由消费者读取然后将其放入 python 队列并在其他线程上做一些事情的人发布.我希望它能帮助别人。
# RGB CONSUME #
import numpy as np
import pika
import sys
import cv2
import queue
import threading
# MACRO DEFINITIONS #
RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False
# PARAMETER CHECK #
# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
if sys.argv[1] == '-display':
STATUS = False
STATUS2 = True
else:
print("Gecersiz parametre")
exit(1)
else:
pass
# LOCAL FUNCTIONS #
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
print(rgb_color_bytes)
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
if STATUS:
cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)
if STATUS or STATUS2:
print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
else:
print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
print('Interrupted ^^ Channel Kapatildi')
rgb_channel.stop_consuming()
sys.exit(0)
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
while True:
rgb_frame = READ_QUEUE.get(block=True)
# 640 * 480
if rgb_frame.size == 921600:
rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
# 1280 * 720
elif rgb_frame.size == 2764800:
rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
# 1920 * 1080
elif rgb_frame.size == 6220800:
rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
else:
print("Something wrong i can feel it")
exit(1)
if STATUS2:
cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
cv2.waitKey(1)
try:
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
except KeyboardInterrupt:
print('Interrupted')
cv2.destroyAllWindows()
sys.exit(0)
我正在尝试将即将到来的数据复制到另一个 queue.Queue() 以在另一个线程中做其他事情。
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
和此处的配置行
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
# connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
rgb_channel.stop_consuming()
sys.exit(0)
最后是我失败的 queue.Queue().get() 函数:
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
print("POINTER 1")
try:
rgb_frame = READ_QUEUE.get(block=True)
except queue.Empty:
rgb_frame = None
if not rgb_frame:
continue
print("POINTER 2")
它卡在那里。我是线程和队列架构方面的新手。我正在尝试 add_callbak_threadsafe()
并且我知道 get() 会阻塞线程。但是我在这里创建了 2 个不同的线程
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
所以如果我创建了 2 个线程,为什么 queue.Queue().get() 会阻塞另一个线程。感谢您的帮助。我可以分享整个代码,它非常简单,将近 170 行。
我在这里解决了这个问题,我想为那些试图将数据放入 rabbitmq 队列并由消费者读取然后将其放入 python 队列并在其他线程上做一些事情的人发布.我希望它能帮助别人。
# RGB CONSUME #
import numpy as np
import pika
import sys
import cv2
import queue
import threading
# MACRO DEFINITIONS #
RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False
# PARAMETER CHECK #
# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
if sys.argv[1] == '-display':
STATUS = False
STATUS2 = True
else:
print("Gecersiz parametre")
exit(1)
else:
pass
# LOCAL FUNCTIONS #
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
print(rgb_color_bytes)
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
if STATUS:
cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)
if STATUS or STATUS2:
print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
else:
print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
print('Interrupted ^^ Channel Kapatildi')
rgb_channel.stop_consuming()
sys.exit(0)
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
while True:
rgb_frame = READ_QUEUE.get(block=True)
# 640 * 480
if rgb_frame.size == 921600:
rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
# 1280 * 720
elif rgb_frame.size == 2764800:
rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
# 1920 * 1080
elif rgb_frame.size == 6220800:
rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
else:
print("Something wrong i can feel it")
exit(1)
if STATUS2:
cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
cv2.waitKey(1)
try:
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
except KeyboardInterrupt:
print('Interrupted')
cv2.destroyAllWindows()
sys.exit(0)