Python 多处理如何在不使用 .join() 方法的情况下更新管理器列表中的复杂对象
Python multiprocessing how to update a complex object in a manager list without using .join() method
我大约 2 个月前在 Python 开始编程,在过去的两周里我一直在努力解决这个问题。
我知道有很多与此类似的线程,但我找不到适合我的情况的解决方案。
我需要一个与 Telegram 交互的主进程和另一个进程 buffer,它理解从主进程接收到的复杂对象并更新它。
我想用一种更简单、更流畅的方式来做这件事。
由于在没有 join() 方法的情况下使用多处理,目前对象未被更新。
然后我尝试改用多线程,但它给我带来了与 Pyrogram 的兼容性问题,这是我用来与 Telegram 交互的框架。
我再次写了我项目的 "complexity" 以重现我遇到的相同错误,并从每个人那里获得并为每个人提供最好的帮助。
a.py
class A():
def __init__(self, length = -1, height = -1):
self.length = length
self.height = height
b.py
from a import A
class B(A):
def __init__(self, length = -1, height = -1, width = -1):
super().__init__(length = -1, height = -1)
self.length = length
self.height = height
self.width = width
def setHeight(self, value):
self.height = value
c.py
class C():
def __init__(self, a, x = 0, y = 0):
self.a = a
self.x = x
self.y = y
def func1(self):
if self.x < 7:
self.x = 7
d.py
from c import C
class D(C):
def __init__(self, a, x = 0, y = 0, z = 0):
super().__init__(a, x = 0, y = 0)
self.a = a
self.x = x
self.y = y
self.z = z
def func2(self):
self.func1()
main.py
from b import B
from d import D
from multiprocessing import Process, Manager
from buffer import buffer
if __name__ == "__main__":
manager = Manager()
lizt = manager.list()
buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
buffer.start()
#can't invoke buffer.join() here because I need the below code to keep running while the buffer process takes a few minutes to end an instance passed in the list
#hence I can't wait the join() function to update the objects inside the buffer but i need objects updated in order to pop them out from the list
import datetime as dt
t = dt.datetime.now()
#library of kind of multithreading (pool of 4 processes), uses asyncio lib
#this while was put to reproduce the same error I am getting
while True:
if t + dt.timedelta(seconds = 10) < dt.datetime.now():
lizt.append(D(B(5, 5, 5)))
t = dt.datetime.now()
"""
#This is the code which looks like the one in my project
#main.py
from pyrogram import Client #library of kind of multithreading (pool of 4 processes), uses asyncio lib
from b import B
from d import D
from multiprocessing import Process, Manager
from buffer import buffer
if __name__ == "__main__":
api_id = 1234567
api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
app = Client("my_account", api_id, api_hash)
manager = Manager()
lizt = manager.list()
buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
buffer.start()
#can't invoke buffer.join() here because I need the below code to run at the same time as the buffer process
#hence I can't wait the join() function to update the objects inside the buffer
@app.on_message()
def my_handler(client, message):
lizt.append(complex_object_conatining_message)
"""
buffer.py
def buffer(buffer):
print("buffer was defined")
while True:
if len(buffer) > 0:
print(buffer[0].x) #prints 0
buffer[0].func2() #this changes the class attribute locally in the class instance but not in here
print(buffer[0].x) #prints 0, but I'd like it to be 7
print(buffer[0].a.height) #prints 5
buffer[0].a.setHeight(10) #and this has the same behaviour
print(buffer[0].a.height) #prints 5 but I'd like it to be 10
buffer.pop(0)
这是关于我遇到的问题的完整代码。
从字面上欢迎每一个建议,希望是建设性的,提前谢谢你!
最后不得不换个方式解决这个问题,就是像框架一样使用asyncio
此解决方案提供了我正在寻找的一切:
-复杂对象更新
-避免多处理问题(特别是 join())
也是:
-轻量级:在我有 2 个 python 进程之前 1) 大约 40K 2) 大约 75K
这个实际过程大约是 30K(而且更快更干净)
这是解决方案,我希望它对其他人有用,就像对我一样:
跳过类的部分,因为这个解决方案更新复杂对象绝对没问题
main.py
from pyrogram import Client
import asyncio
import time
def cancel_tasks():
#get all task in current loop
tasks = asyncio.Task.all_tasks()
for t in tasks:
t.cancel()
try:
buffer = []
firstWorker(buffer) #this one is the old buffer.py file and function
#the missing loop and loop method are explained in the next piece of code
except KeyboardInterrupt:
print("")
finally:
print("Closing Loop")
cancel_tasks()
firstWorker.py
import asyncio
def firstWorker(buffer):
print("First Worker Executed")
api_id = 1234567
api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
app = Client("my_account", api_id, api_hash)
@app.on_message()
async def my_handler(client, message):
print("Message Arrived")
buffer.append(complex_object_conatining_message)
await asyncio.sleep(1)
app.run(secondWorker(buffer)) #here is the trick: I changed the
#method run() of the Client class
#inside the Pyrogram framework
#since it was a loop itself.
#In this way I added another task
#to the existing loop in orther to
#let run both of them together.
我的secondWorker.py
import asyncio
async def secondWorker(buffer):
while True:
if len(buffer) > 0:
print(buffer.pop(0))
await asyncio.sleep(1)
可以在此处找到了解此代码中使用的 asyncio 的资源:
我大约 2 个月前在 Python 开始编程,在过去的两周里我一直在努力解决这个问题。 我知道有很多与此类似的线程,但我找不到适合我的情况的解决方案。
我需要一个与 Telegram 交互的主进程和另一个进程 buffer,它理解从主进程接收到的复杂对象并更新它。
我想用一种更简单、更流畅的方式来做这件事。
由于在没有 join() 方法的情况下使用多处理,目前对象未被更新。
然后我尝试改用多线程,但它给我带来了与 Pyrogram 的兼容性问题,这是我用来与 Telegram 交互的框架。
我再次写了我项目的 "complexity" 以重现我遇到的相同错误,并从每个人那里获得并为每个人提供最好的帮助。
a.py
class A():
def __init__(self, length = -1, height = -1):
self.length = length
self.height = height
b.py
from a import A
class B(A):
def __init__(self, length = -1, height = -1, width = -1):
super().__init__(length = -1, height = -1)
self.length = length
self.height = height
self.width = width
def setHeight(self, value):
self.height = value
c.py
class C():
def __init__(self, a, x = 0, y = 0):
self.a = a
self.x = x
self.y = y
def func1(self):
if self.x < 7:
self.x = 7
d.py
from c import C
class D(C):
def __init__(self, a, x = 0, y = 0, z = 0):
super().__init__(a, x = 0, y = 0)
self.a = a
self.x = x
self.y = y
self.z = z
def func2(self):
self.func1()
main.py
from b import B
from d import D
from multiprocessing import Process, Manager
from buffer import buffer
if __name__ == "__main__":
manager = Manager()
lizt = manager.list()
buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
buffer.start()
#can't invoke buffer.join() here because I need the below code to keep running while the buffer process takes a few minutes to end an instance passed in the list
#hence I can't wait the join() function to update the objects inside the buffer but i need objects updated in order to pop them out from the list
import datetime as dt
t = dt.datetime.now()
#library of kind of multithreading (pool of 4 processes), uses asyncio lib
#this while was put to reproduce the same error I am getting
while True:
if t + dt.timedelta(seconds = 10) < dt.datetime.now():
lizt.append(D(B(5, 5, 5)))
t = dt.datetime.now()
"""
#This is the code which looks like the one in my project
#main.py
from pyrogram import Client #library of kind of multithreading (pool of 4 processes), uses asyncio lib
from b import B
from d import D
from multiprocessing import Process, Manager
from buffer import buffer
if __name__ == "__main__":
api_id = 1234567
api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
app = Client("my_account", api_id, api_hash)
manager = Manager()
lizt = manager.list()
buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
buffer.start()
#can't invoke buffer.join() here because I need the below code to run at the same time as the buffer process
#hence I can't wait the join() function to update the objects inside the buffer
@app.on_message()
def my_handler(client, message):
lizt.append(complex_object_conatining_message)
"""
buffer.py
def buffer(buffer):
print("buffer was defined")
while True:
if len(buffer) > 0:
print(buffer[0].x) #prints 0
buffer[0].func2() #this changes the class attribute locally in the class instance but not in here
print(buffer[0].x) #prints 0, but I'd like it to be 7
print(buffer[0].a.height) #prints 5
buffer[0].a.setHeight(10) #and this has the same behaviour
print(buffer[0].a.height) #prints 5 but I'd like it to be 10
buffer.pop(0)
这是关于我遇到的问题的完整代码。 从字面上欢迎每一个建议,希望是建设性的,提前谢谢你!
最后不得不换个方式解决这个问题,就是像框架一样使用asyncio
此解决方案提供了我正在寻找的一切:
-复杂对象更新
-避免多处理问题(特别是 join())
也是:
-轻量级:在我有 2 个 python 进程之前 1) 大约 40K 2) 大约 75K
这个实际过程大约是 30K(而且更快更干净)
这是解决方案,我希望它对其他人有用,就像对我一样:
跳过类的部分,因为这个解决方案更新复杂对象绝对没问题
main.py
from pyrogram import Client
import asyncio
import time
def cancel_tasks():
#get all task in current loop
tasks = asyncio.Task.all_tasks()
for t in tasks:
t.cancel()
try:
buffer = []
firstWorker(buffer) #this one is the old buffer.py file and function
#the missing loop and loop method are explained in the next piece of code
except KeyboardInterrupt:
print("")
finally:
print("Closing Loop")
cancel_tasks()
firstWorker.py
import asyncio
def firstWorker(buffer):
print("First Worker Executed")
api_id = 1234567
api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
app = Client("my_account", api_id, api_hash)
@app.on_message()
async def my_handler(client, message):
print("Message Arrived")
buffer.append(complex_object_conatining_message)
await asyncio.sleep(1)
app.run(secondWorker(buffer)) #here is the trick: I changed the
#method run() of the Client class
#inside the Pyrogram framework
#since it was a loop itself.
#In this way I added another task
#to the existing loop in orther to
#let run both of them together.
我的secondWorker.py
import asyncio
async def secondWorker(buffer):
while True:
if len(buffer) > 0:
print(buffer.pop(0))
await asyncio.sleep(1)
可以在此处找到了解此代码中使用的 asyncio 的资源: