运行 multiprocessing/Multi 在 pika 消费者中线程化并将数据定向到特定数据帧
Running multiprocessing/Multi threading in pika consumer and directing the data to specific dataframe
我是 Python multi-threading/processing 和 RabbitMQ 的新手。基本上我有一个 RabbitMQ 消费者,它为我提供实时医院数据。每条消息都包含每位患者的生命体征。我需要为每位患者存储至少 5 条这样的消息,以便 运行 我的逻辑并用于触发警报。此外,由于患者数量未知,我正在考虑多线程或多处理,以使我的警报几乎实时并扩大规模。我的方法是为每个患者创建一个全局数据框,然后将与该患者相关的消息附加到数据框中。但现在我在创建 Multi thread/process 并将数据发送到相应的患者数据框时遇到了问题。这是我的代码
bed_list=[]
thread_list=[]
bed_df={}
alarms=0
def spo2(body,bed):
body_data= body.decode()
print(body_data)
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
if bed_id=bed:
primary_attributes= json_normalize(packet)
'''some logic'''
global bed_df
bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
print(bed_df[bed_id])
''' some other calcuation'''
phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
bed_df[bed_id]= bed_df[bed_id].tail(4) # resets the size of the dataframe
def callback(ch, method, properties, body):
body_data= body.decode()
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
print(bed_id)
global bed_list
if bed_id not in bed_list:
bed_list.append(bed_id)
#pseudo code
for bed in bed_list:
proc = Process(target=spo2, args=(bed,))
procs.append(proc)
proc.start()
我无法找到一种方法,我可以为每个患者创建一个 thread/process(bed_id),这样每当我收到该患者的消息时(bed_id ) 我可以将其定向到该线程。我已经检查了队列,但是关于实现这种情况的文档不是很清楚。
在你走这条路之前,你应该评估一下是否有必要。一个重要的限制是 rabbitmq bandwidth.
构建一个单线程应用程序,并开始为其提供合成的 rabbitmq 消息。提高 msg/s 速度,直到它跟不上为止。
如果该比率比实际可能发生的比率高得多,那么您就完蛋了。 :-)
如果不是,那么您开始分析您的应用程序以找出它的哪些部分花费的时间最多。这些是你的瓶颈。
只有知道瓶颈是什么,才能看相关代码,思考如何改进。
请注意 multiprocessing
和 threading
做不同的事情并且有不同的应用程序。如果您的应用程序受到它可以执行的计算量的限制,那么 multiprocessing
可以通过将计算分散到多个 CPU 核心来提供帮助。请注意,这仅在计算彼此 独立 时才有效。如果您的应用程序花费大量时间等待 I/O,threading
可以帮助您在一个线程中进行计算,而另一个线程正在等待 I/O.
但就复杂性而言,两者都不是免费的。例如,使用 threading
时,您必须使用锁来保护数据帧的读写,以便一次只有一个线程可以读取或修改所述数据帧。使用 multiprocessing
您必须将数据从工作进程发送回父进程。
在这种情况下,我认为 multiprocessing
最有用。您可以设置多个进程,每个进程负责 beds/patients 的一部分。如果 rabbitmq 可以有多个监听器,你可以让每个工作进程只处理来自它负责的患者的消息。否则,您必须将消息分发到适当的进程。每个工作进程现在为许多患者处理消息(并保留数据帧)。当根据对数据进行的计算触发警报时,工作人员只需向父进程发送一条消息,详细说明患者的标识符和警报的性质。
我是 Python multi-threading/processing 和 RabbitMQ 的新手。基本上我有一个 RabbitMQ 消费者,它为我提供实时医院数据。每条消息都包含每位患者的生命体征。我需要为每位患者存储至少 5 条这样的消息,以便 运行 我的逻辑并用于触发警报。此外,由于患者数量未知,我正在考虑多线程或多处理,以使我的警报几乎实时并扩大规模。我的方法是为每个患者创建一个全局数据框,然后将与该患者相关的消息附加到数据框中。但现在我在创建 Multi thread/process 并将数据发送到相应的患者数据框时遇到了问题。这是我的代码
bed_list=[]
thread_list=[]
bed_df={}
alarms=0
def spo2(body,bed):
body_data= body.decode()
print(body_data)
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
if bed_id=bed:
primary_attributes= json_normalize(packet)
'''some logic'''
global bed_df
bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
print(bed_df[bed_id])
''' some other calcuation'''
phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
bed_df[bed_id]= bed_df[bed_id].tail(4) # resets the size of the dataframe
def callback(ch, method, properties, body):
body_data= body.decode()
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
print(bed_id)
global bed_list
if bed_id not in bed_list:
bed_list.append(bed_id)
#pseudo code
for bed in bed_list:
proc = Process(target=spo2, args=(bed,))
procs.append(proc)
proc.start()
我无法找到一种方法,我可以为每个患者创建一个 thread/process(bed_id),这样每当我收到该患者的消息时(bed_id ) 我可以将其定向到该线程。我已经检查了队列,但是关于实现这种情况的文档不是很清楚。
在你走这条路之前,你应该评估一下是否有必要。一个重要的限制是 rabbitmq bandwidth.
构建一个单线程应用程序,并开始为其提供合成的 rabbitmq 消息。提高 msg/s 速度,直到它跟不上为止。
如果该比率比实际可能发生的比率高得多,那么您就完蛋了。 :-)
如果不是,那么您开始分析您的应用程序以找出它的哪些部分花费的时间最多。这些是你的瓶颈。 只有知道瓶颈是什么,才能看相关代码,思考如何改进。
请注意 multiprocessing
和 threading
做不同的事情并且有不同的应用程序。如果您的应用程序受到它可以执行的计算量的限制,那么 multiprocessing
可以通过将计算分散到多个 CPU 核心来提供帮助。请注意,这仅在计算彼此 独立 时才有效。如果您的应用程序花费大量时间等待 I/O,threading
可以帮助您在一个线程中进行计算,而另一个线程正在等待 I/O.
但就复杂性而言,两者都不是免费的。例如,使用 threading
时,您必须使用锁来保护数据帧的读写,以便一次只有一个线程可以读取或修改所述数据帧。使用 multiprocessing
您必须将数据从工作进程发送回父进程。
在这种情况下,我认为 multiprocessing
最有用。您可以设置多个进程,每个进程负责 beds/patients 的一部分。如果 rabbitmq 可以有多个监听器,你可以让每个工作进程只处理来自它负责的患者的消息。否则,您必须将消息分发到适当的进程。每个工作进程现在为许多患者处理消息(并保留数据帧)。当根据对数据进行的计算触发警报时,工作人员只需向父进程发送一条消息,详细说明患者的标识符和警报的性质。