更新 multiprocessing.Manager.dict() 中的对象
Update object inside a multiprocessing.Manager.dict()
我想知道如何更新分配为不同进程之间共享字典值的对象。我有以下 class:
class Task:
STATUS_PROCESSING = 0
STATUS_EXECUTING = 1
STATUS_QUEUED = 2
STATUS_TERMINATED = 3
STATUS_HALTED = 4
STATUS_STOPPED = 5
def __init__(self, id: str, uuid: str, options: dict):
self.id = id
self.uuid = uuid
self.options = options
self.state = 0
# Some properties...
def execute(self):
""" Executes the task
"""
# Set self status to Executing
self.state = Task.STATUS_EXECUTING
print('Executing...')
self.state = Task.STATUS_TERMINATED
它只是创建一个具有给定 ID 的新任务,并在调用 execute()
时执行其核心方法。我有另一个带有静态方法的 class 用于将新的一对 (id, task) 附加到字典,并读取字典执行其所有任务直到主程序停止:
class DummyList:
@staticmethod
def submit_task(d: dict, uuid: str, options: dict):
""" Submit a new task
"""
# If invalid UUID
if not Task.is_valid_uuid(uuid):
return False
# If more than 20 tasks
if len(d) > 19:
return False
# Create random ID (simplified for question)
r_id = str(random.randint(1, 2000000))
if r_id in d:
return False
# Add task to the dictionary
d[r_id] = Task(r_id, uuid, options)
# Set status to queue
d[r_id].state = Task.STATUS_QUEUED
# Return the created ID
return r_id
@staticmethod
def execute_forever(d):
try:
while True:
for i in d.values():
print(i.state)
i.execute()
time.sleep(5)
except KeyboardInterrupt:
pass
问题是 DummyList.execute_forever()
将从另一个进程调用,而主进程将执行 submit_task(...)
函数以添加新任务。像这样:
# Create a shared dict
m = multiprocessing.Manager()
shared_d = m.dict()
# Start the Task shared list execution in another process
p = multiprocessing.Process(target=DummyList.execute_forever, args=(shared_d,))
# Set the process to exit when the main halts
p.daemon = True
p.start()
........
# From another place
# The message variable is not important
DummyList.submit_task(shared_d, message['proc'], message['options'])
有效!任务已创建,分配给字典并执行,但以下几行(在上面的代码中看到)没有正确执行:
self.state = Task.STATUS_EXECUTING
self.state = Task.STATUS_TERMINATED
d[r_id].state = Task.STATUS_QUEUED
如果我们尝试在整个代码中写 ìf shared_d[<some_id>].state == 0
,它将始终是 True
,因为 属性 不会更新
我想那是因为共享字典在修改对象属性时没有更新,可能是因为字典只知道他必须在他的 getitem 或 [=34= 时更新]setitem 方法被调用。你知道有没有办法改变这种行为?
非常感谢!
我终于找到了解决办法。除非调用代理字典中的 __getitem__
或 __setitem__
方法,否则字典中的对象不会更新。这就是我更改以下行的原因:
任务
execute()
方法以 return self
结束。 self.state
必须在整个执行过程中更改。
任务管理器
方法更改为:
@staticmethod
def execute_forever(d):
""" Infinite loop reading the queued tasks and executing all of them.
"""
try:
while True:
# Notice the loop using the keys
for i in d.keys():
# Execute and re-assign item
d[i] = d[i].execute()
time.sleep(5)
except KeyboardInterrupt:
pass