Python3.6 有erlang风格的消息队列吗?

Python3.6 is there a erlang style message queue?

我正在寻找 python 3.6(这个确切版本)的消息队列实现,可用于 multiprocess.Process 之间的通信,具体来说,它应该是一个多生产者,单一消费者, fifo 优先接收应用程序特定类型的消息(例如,如果队列中间有系统消息(用 erlang 术语),而队列头部有正常消息,下一个接收应该 return系统消息而不是正常消息)

但我怀疑会有这样的库,所以问题就变成了,是否有任何 stdlib 或第三方库可以给我一大块共享内存或更好的列表,这样我就可以读写到支持的缓冲区但是 memory/list 并用 mp.Lock?

之类的东西来保护秩序

multiprocessing.Manager 使用 tcp,并启动一个新进程

我不太熟悉 Erlang,但是,根据您描述需求的方式,我认为您可以采用 multiprocessing.Queue 的方法并在阅读之前对邮件进行排序。

想法是为每个进程设置一个multiprocessing.Queue(FIFO 消息队列)。当进程 A 向进程 B 发送消息时,进程 A 将其消息连同消息的优先级放入进程 B 的消息队列中。当进程读取其消息时,它会将消息从 FIFO 队列传输到列表中,然后在处理消息之前对列表进行排序。消息首先按其优先级排序,然后是它们到达消息队列的时间。

这是一个已经在 Windows 上使用 Python 3.6 测试过的示例。

from multiprocessing import Process, Queue
import queue
import time

def handle_messages(process_id, message_queue):
    # Keep track of the message number to ensure messages with the same priority
    # are read in a FIFO fashion.
    message_no = 0
    messages = []
    while True:
        try:
            priority, contents = message_queue.get_nowait()
            messages.append((priority, message_no, contents))
            message_no+=1
        except queue.Empty:
            break
    # Handle messages in correct order.
    for message in sorted(messages):
        print("{}: {}".format(process_id, message[-1]))
    
def send_message_with_priority(destination_queue, message, priority):
    # Send a message to a destination queue with a specified priority.
    destination_queue.put((-priority,message))

def process_0(my_id, queues):
    while True:
        # Do work
        print("Doing work...")
        time.sleep(5)
        # Receive messages
        handle_messages(my_id, queues[my_id])
            
def process_1(my_id, queues):
    message_no = 0
    while True:
        # Do work
        time.sleep(1)
        # Receive messages
        handle_messages(my_id, queues[my_id])
        send_message_with_priority(queues[0], "This is message {} from process {}".format(message_no, my_id), 1)
        message_no+=1
        
def process_2(my_id, queues):
    message_no = 0
    while True:
        # Do work
        time.sleep(3)
        # Receive messages
        handle_messages(my_id, queues[my_id])
        send_message_with_priority(queues[0], "This is urgent message {} from process {}".format(message_no, my_id), 2)
        message_no+=1

if __name__ == "__main__":
    qs = {i: Queue() for i in range(3)}
    processes = [Process(target=p, args=(i, qs)) for i, p in enumerate([process_0, process_1, process_2])]
    for p in processes:
        p.start()
    for p in processes:
        p.join()