运行 multiprocessing/Multi 在 pika 消费者中线程化并将数据定向到特定数据帧

Running multiprocessing/Multi threading in pika consumer and directing the data to specific dataframe

我是 Python multi-threading/processing 和 RabbitMQ 的新手。基本上我有一个 RabbitMQ 消费者,它为我提供实时医院数据。每条消息都包含每位患者的生命体征。我需要为每位患者存储至少 5 条这样的消息,以便 运行 我的逻辑并用于触发警报。此外,由于患者数量未知,我正在考虑多线程或多处理,以使我的警报几乎实时并扩大规模。我的方法是为每个患者创建一个全局数据框,然后将与该患者相关的消息附加到数据框中。但现在我在创建 Multi thread/process 并将数据发送到相应的患者数据框时遇到了问题。这是我的代码


bed_list=[]
thread_list=[]
bed_df={}
alarms=0

def spo2(body,bed):
    body_data= body.decode()
    print(body_data)
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    if bed_id=bed:
        primary_attributes= json_normalize(packet)
         '''some logic'''
        global bed_df
        bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
        print(bed_df[bed_id])

        ''' some other calcuation'''

            phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
            bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 


def callback(ch, method, properties, body):
    body_data= body.decode()
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    print(bed_id)
    global bed_list
    if bed_id not in bed_list:
      bed_list.append(bed_id)


#pseudo code
 for bed in bed_list:
     proc = Process(target=spo2, args=(bed,))
     procs.append(proc)
     proc.start()

我无法找到一种方法,我可以为每个患者创建一个 thread/process(bed_id),这样每当我收到该患者的消息时(bed_id ) 我可以将其定向到该线程。我已经检查了队列,但是关于实现这种情况的文档不是很清楚。

在你走这条路之前,你应该评估一下是否有必要。一个重要的限制是 rabbitmq bandwidth.

构建一个单线程应用程序,并开始为其提供合成的 rabbitmq 消息。提高 msg/s 速度,直到它跟不上为止。

如果该比率比实际可能发生的比率高得多,那么您就完蛋了。 :-)

如果不是,那么您开始分析您的应用程序以找出它的哪些部分花费的时间最多。这些是你的瓶颈。 只有知道瓶颈是什么,才能看相关代码,思考如何改进。

请注意 multiprocessingthreading 做不同的事情并且有不同的应用程序。如果您的应用程序受到它可以执行的计算量的限制,那么 multiprocessing 可以通过将计算分散到多个 CPU 核心来提供帮助。请注意,这仅在计算彼此 独立 时才有效。如果您的应用程序花费大量时间等待 I/O,threading 可以帮助您在一个线程中进行计算,而另一个线程正在等待 I/O.

但就复杂性而言,两者都不是免费的。例如,使用 threading 时,您必须使用锁来保护数据帧的读写,以便一次只有一个线程可以读取或修改所述数据帧。使用 multiprocessing 您必须将数据从工作进程发送回父进程。

在这种情况下,我认为 multiprocessing 最有用。您可以设置多个进程,每个进程负责 beds/patients 的一部分。如果 rabbitmq 可以有多个监听器,你可以让每个工作进程只处理来自它负责的患者的消息。否则,您必须将消息分发到适当的进程。每个工作进程现在为许多患者处理消息(并保留数据帧)。当根据对数据进行的计算触发警报时,工作人员只需向父进程发送一条消息,详细说明患者的标识符和警报的性质。