Python3.6 有erlang风格的消息队列吗?
Python3.6 is there a erlang style message queue?
我正在寻找 python 3.6(这个确切版本)的消息队列实现,可用于 multiprocess.Process
之间的通信,具体来说,它应该是一个多生产者,单一消费者, fifo 优先接收应用程序特定类型的消息(例如,如果队列中间有系统消息(用 erlang 术语),而队列头部有正常消息,下一个接收应该 return系统消息而不是正常消息)
但我怀疑会有这样的库,所以问题就变成了,是否有任何 stdlib 或第三方库可以给我一大块共享内存或更好的列表,这样我就可以读写到支持的缓冲区但是 memory/list 并用 mp.Lock
?
之类的东西来保护秩序
multiprocessing.Manager
使用 tcp,并启动一个新进程
我不太熟悉 Erlang,但是,根据您描述需求的方式,我认为您可以采用 multiprocessing.Queue
的方法并在阅读之前对邮件进行排序。
想法是为每个进程设置一个multiprocessing.Queue
(FIFO 消息队列)。当进程 A 向进程 B 发送消息时,进程 A 将其消息连同消息的优先级放入进程 B 的消息队列中。当进程读取其消息时,它会将消息从 FIFO 队列传输到列表中,然后在处理消息之前对列表进行排序。消息首先按其优先级排序,然后是它们到达消息队列的时间。
这是一个已经在 Windows 上使用 Python 3.6 测试过的示例。
from multiprocessing import Process, Queue
import queue
import time
def handle_messages(process_id, message_queue):
# Keep track of the message number to ensure messages with the same priority
# are read in a FIFO fashion.
message_no = 0
messages = []
while True:
try:
priority, contents = message_queue.get_nowait()
messages.append((priority, message_no, contents))
message_no+=1
except queue.Empty:
break
# Handle messages in correct order.
for message in sorted(messages):
print("{}: {}".format(process_id, message[-1]))
def send_message_with_priority(destination_queue, message, priority):
# Send a message to a destination queue with a specified priority.
destination_queue.put((-priority,message))
def process_0(my_id, queues):
while True:
# Do work
print("Doing work...")
time.sleep(5)
# Receive messages
handle_messages(my_id, queues[my_id])
def process_1(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(1)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is message {} from process {}".format(message_no, my_id), 1)
message_no+=1
def process_2(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(3)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is urgent message {} from process {}".format(message_no, my_id), 2)
message_no+=1
if __name__ == "__main__":
qs = {i: Queue() for i in range(3)}
processes = [Process(target=p, args=(i, qs)) for i, p in enumerate([process_0, process_1, process_2])]
for p in processes:
p.start()
for p in processes:
p.join()
我正在寻找 python 3.6(这个确切版本)的消息队列实现,可用于 multiprocess.Process
之间的通信,具体来说,它应该是一个多生产者,单一消费者, fifo 优先接收应用程序特定类型的消息(例如,如果队列中间有系统消息(用 erlang 术语),而队列头部有正常消息,下一个接收应该 return系统消息而不是正常消息)
但我怀疑会有这样的库,所以问题就变成了,是否有任何 stdlib 或第三方库可以给我一大块共享内存或更好的列表,这样我就可以读写到支持的缓冲区但是 memory/list 并用 mp.Lock
?
multiprocessing.Manager
使用 tcp,并启动一个新进程
我不太熟悉 Erlang,但是,根据您描述需求的方式,我认为您可以采用 multiprocessing.Queue
的方法并在阅读之前对邮件进行排序。
想法是为每个进程设置一个multiprocessing.Queue
(FIFO 消息队列)。当进程 A 向进程 B 发送消息时,进程 A 将其消息连同消息的优先级放入进程 B 的消息队列中。当进程读取其消息时,它会将消息从 FIFO 队列传输到列表中,然后在处理消息之前对列表进行排序。消息首先按其优先级排序,然后是它们到达消息队列的时间。
这是一个已经在 Windows 上使用 Python 3.6 测试过的示例。
from multiprocessing import Process, Queue
import queue
import time
def handle_messages(process_id, message_queue):
# Keep track of the message number to ensure messages with the same priority
# are read in a FIFO fashion.
message_no = 0
messages = []
while True:
try:
priority, contents = message_queue.get_nowait()
messages.append((priority, message_no, contents))
message_no+=1
except queue.Empty:
break
# Handle messages in correct order.
for message in sorted(messages):
print("{}: {}".format(process_id, message[-1]))
def send_message_with_priority(destination_queue, message, priority):
# Send a message to a destination queue with a specified priority.
destination_queue.put((-priority,message))
def process_0(my_id, queues):
while True:
# Do work
print("Doing work...")
time.sleep(5)
# Receive messages
handle_messages(my_id, queues[my_id])
def process_1(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(1)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is message {} from process {}".format(message_no, my_id), 1)
message_no+=1
def process_2(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(3)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is urgent message {} from process {}".format(message_no, my_id), 2)
message_no+=1
if __name__ == "__main__":
qs = {i: Queue() for i in range(3)}
processes = [Process(target=p, args=(i, qs)) for i, p in enumerate([process_0, process_1, process_2])]
for p in processes:
p.start()
for p in processes:
p.join()