Python 多处理如何在不使用 .join() 方法的情况下更新管理器列表中的复杂对象

Python multiprocessing how to update a complex object in a manager list without using .join() method

我大约 2 个月前在 Python 开始编程,在过去的两周里我一直在努力解决这个问题。 我知道有很多与此类似的线程,但我找不到适合我的情况的解决方案。

我需要一个与 Telegram 交互的主进程和另一个进程 buffer,它理解从主进程接收到的复杂对象并更新它。

我想用一种更简单、更流畅的方式来做这件事。

由于在没有 join() 方法的情况下使用多处理,目前对象未被更新。

然后我尝试改用多线程,但它给我带来了与 Pyrogram 的兼容性问题,这是我用来与 Telegram 交互的框架。

我再次写了我项目的 "complexity" 以重现我遇到的相同错误,并从每个人那里获得并为每个人提供最好的帮助。

a.py

class A():
    def __init__(self, length = -1, height = -1):
        self.length = length
        self.height = height

b.py

from a import A
class B(A):
    def __init__(self, length = -1, height = -1, width = -1):
        super().__init__(length = -1, height = -1)
        self.length = length
        self.height = height
        self.width = width

    def setHeight(self, value):
        self.height = value

c.py

class C():
    def __init__(self, a, x = 0, y = 0):
        self.a = a
        self.x = x
        self.y = y

    def func1(self):
        if self.x < 7:
            self.x = 7

d.py

from c import C
class D(C):
    def __init__(self, a, x = 0, y = 0, z = 0):
        super().__init__(a, x = 0, y = 0)
        self.a = a
        self.x = x
        self.y = y
        self.z = z

    def func2(self):
        self.func1()

main.py

from b import B
from d import D
from  multiprocessing import Process, Manager
from buffer import buffer

if __name__ == "__main__":

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to keep running while the buffer process takes a few minutes to end an instance passed in the list
    #hence I can't wait the join() function to update the objects inside the buffer but i need objects updated in order to pop them out from the list

    import datetime as dt
    t = dt.datetime.now()

    #library of kind of multithreading (pool of 4 processes), uses asyncio lib
    #this while was put to reproduce the same error I am getting

    while True:
        if t + dt.timedelta(seconds = 10) < dt.datetime.now():
            lizt.append(D(B(5, 5, 5)))
            t = dt.datetime.now()


"""
#This is the code which looks like the one in my project

#main.py
from pyrogram import Client #library of kind of multithreading (pool of 4 processes), uses asyncio lib
from b import B
from d import D
from  multiprocessing import Process, Manager
from buffer import buffer

if __name__ == "__main__":

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account", api_id, api_hash)

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to run at the same time as the buffer process
    #hence I can't wait the join() function to update the objects inside the buffer

@app.on_message()
def my_handler(client, message):
    lizt.append(complex_object_conatining_message)
"""

buffer.py

def buffer(buffer):
    print("buffer was defined")
    while True:
        if len(buffer) > 0:
            print(buffer[0].x) #prints 0
            buffer[0].func2() #this changes the class attribute locally in the class instance but not in here
            print(buffer[0].x) #prints 0, but I'd like it to be 7

            print(buffer[0].a.height) #prints 5
            buffer[0].a.setHeight(10) #and this has the same behaviour
            print(buffer[0].a.height) #prints 5 but I'd like it to be 10

            buffer.pop(0)

这是关于我遇到的问题的完整代码。 从字面上欢迎每一个建议,希望是建设性的,提前谢谢你!

最后不得不换个方式解决这个问题,就是像框架一样使用asyncio

此解决方案提供了我正在寻找的一切:

-复杂对象更新

-避免多处理问题(特别是 join())

也是:

-轻量级:在我有 2 个 python 进程之前 1) 大约 40K 2) 大约 75K

这个实际过程大约是 30K(而且更快更干净)

这是解决方案,我希望它对其他人有用,就像对我一样:

跳过类的部分,因为这个解决方案更新复杂对象绝对没问题

main.py

from pyrogram import Client
import asyncio
import time

def cancel_tasks():
    #get all task in current loop
    tasks = asyncio.Task.all_tasks()
    for t in tasks:
        t.cancel()

try:
    buffer = []
    firstWorker(buffer) #this one is the old buffer.py file and function
    #the missing loop and loop method are explained in the next piece of code
except KeyboardInterrupt:
    print("")
finally:
    print("Closing Loop")
    cancel_tasks()

firstWorker.py

import asyncio
def firstWorker(buffer):
    print("First Worker Executed")

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account", api_id, api_hash)

    @app.on_message()
    async def my_handler(client, message):
        print("Message Arrived")
        buffer.append(complex_object_conatining_message)
        await asyncio.sleep(1)

    app.run(secondWorker(buffer)) #here is the trick: I changed the 
                                  #method run() of the Client class 
                                  #inside the Pyrogram framework 
                                  #since it was a loop itself. 
                                  #In this way I added another task 
                                  #to the existing loop in orther to 
                                  #let run both of them together.

我的secondWorker.py

import asyncio
async def secondWorker(buffer):
    while True:
        if len(buffer) > 0:
            print(buffer.pop(0))

        await asyncio.sleep(1)

可以在此处找到了解此代码中使用的 asyncio 的资源:

Asyncio simple tutorial

Python Asyncio Official Documentation

This tutorial about how to fix classical Asyncio errors