使用多处理在 class 内共享属性
Attribute sharing within a class using multiprocessing
我有一个 class,它有一个字典作为属性。在这个 class 中,我 运行 multiprocessing
填充一个 Queue
,然后使用额外的过程对 Queue
中的项目执行一些计算。当满足条件时,我将结果存储在这个字典中,但是我已经看到当进程想要在那里存储一个值时字典中不存在字典键。
class RNG():
def __init__(self):
self.mydict = {}
self.done = False
self.q = Queue(maxsize = 100)
self.lock = Lock()
def _fill_queue(self):
while not self.done:
rng = randint(1,9e6)
if rng % 2 ==0:
_type = 'even'
if 'even' not in self.mydict.keys():
self.mydict['even'] = []
else:
_type = 'odd'
if 'odd' not in self.mydict.keys():
self.mydict['odd'] = []
while self.q.full():
sleep(10)
self.lock.acquire()
self.q.put((_type,rng))
self.lock.release()
def _process_queue(self):
while not self.done:
self.lock.acquire()
if self.q.empty():
self.lock.release()
continue
_type,num = self.q.get()
self.lock.release()
print(f'Appending {_type} number!')
self.mydict[_type].append(num)
self._check_for_exit()
def _check_for_exit(self):
if len(self.mydict['odd']) >= 1e6 and len(self.mydict['even'])>=1e6:
self.done = True
def run(self):
jobs = []
p = Process(target = self._fill_queue)
jobs.append(p)
p.start()
for _ in range(5):
p = Process(target = self._process_queue)
jobs.append(p)
p.start()
for job in jobs:
job.join()
if __name__ == '__main__':
rng = RNG()
rng.run()
当我 运行 这样做时,我在尝试将数字追加到字典中时收到以下错误:
KeyError: 'even'
KeyError: 'odd'
为什么字典中没有添加关键字?而且,如果每个进程都设法写入一个文件并且该文件具有相同的名称,这是否意味着我需要实现某种信号量或 Pipe
?
请记住,这些 运行 是两个独立的进程。他们不共享内存。每个人都有自己的 RNG 实例副本,并且不会看到对方所做的更改。如果需要在它们之间进行通信,则需要使用Queue或者Pipe。
通常,多处理应用程序要建立一个“命令”对象在它们之间传递。该命令有某种动词,加上要执行的数据。所以,当你想添加一个密钥时,你可以发送 ('add','even') 或类似的东西。然后,您的队列处理程序可以执行多种不同的操作。
我有一个 class,它有一个字典作为属性。在这个 class 中,我 运行 multiprocessing
填充一个 Queue
,然后使用额外的过程对 Queue
中的项目执行一些计算。当满足条件时,我将结果存储在这个字典中,但是我已经看到当进程想要在那里存储一个值时字典中不存在字典键。
class RNG():
def __init__(self):
self.mydict = {}
self.done = False
self.q = Queue(maxsize = 100)
self.lock = Lock()
def _fill_queue(self):
while not self.done:
rng = randint(1,9e6)
if rng % 2 ==0:
_type = 'even'
if 'even' not in self.mydict.keys():
self.mydict['even'] = []
else:
_type = 'odd'
if 'odd' not in self.mydict.keys():
self.mydict['odd'] = []
while self.q.full():
sleep(10)
self.lock.acquire()
self.q.put((_type,rng))
self.lock.release()
def _process_queue(self):
while not self.done:
self.lock.acquire()
if self.q.empty():
self.lock.release()
continue
_type,num = self.q.get()
self.lock.release()
print(f'Appending {_type} number!')
self.mydict[_type].append(num)
self._check_for_exit()
def _check_for_exit(self):
if len(self.mydict['odd']) >= 1e6 and len(self.mydict['even'])>=1e6:
self.done = True
def run(self):
jobs = []
p = Process(target = self._fill_queue)
jobs.append(p)
p.start()
for _ in range(5):
p = Process(target = self._process_queue)
jobs.append(p)
p.start()
for job in jobs:
job.join()
if __name__ == '__main__':
rng = RNG()
rng.run()
当我 运行 这样做时,我在尝试将数字追加到字典中时收到以下错误:
KeyError: 'even'
KeyError: 'odd'
为什么字典中没有添加关键字?而且,如果每个进程都设法写入一个文件并且该文件具有相同的名称,这是否意味着我需要实现某种信号量或 Pipe
?
请记住,这些 运行 是两个独立的进程。他们不共享内存。每个人都有自己的 RNG 实例副本,并且不会看到对方所做的更改。如果需要在它们之间进行通信,则需要使用Queue或者Pipe。
通常,多处理应用程序要建立一个“命令”对象在它们之间传递。该命令有某种动词,加上要执行的数据。所以,当你想添加一个密钥时,你可以发送 ('add','even') 或类似的东西。然后,您的队列处理程序可以执行多种不同的操作。