修改多处理池的管理器字典中的列表
Modify a list in Multiprocessing pool's manager dict
我有一个元素列表,我正在多处理 apply_async
任务中处理这些元素,并使用管理器字典中的一个键逐一更新处理的元素,我想在其上映射整个列表。
我尝试了以下代码:
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
d['task']['processed_list'].append(ele)
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
最后它只是在字典中发布空列表。
输出:
{'task': {'processed_list': []}}
现在,经过一番研究,我了解到 manager dict 中的元素变得不可变,因此您必须使用新数据重新初始化整个 dict 才能更新它。所以我尝试了下面的代码,但它给出了一个奇怪的错误。
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
old_list = d['task']['processed_list']
new_list = old_list.append(ele)
#Have to do it this way since elements inside a manager dict become
#immutable so
d['task'] = {
'processed_list': new_list
}
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
输出:
Traceback (most recent call last): File "./a.py", line 29, in
p.apply_async(spammer_task (d, my_list)) File "./a.py", line 14, in spammer_task
new_list = old_list.append(ele) AttributeError: 'NoneType' object has no attribute 'append'
它似乎以某种方式将 None
附加到列表中,但我不明白为什么。
根据 https://bugs.python.org/issue6766
的解决方案
下面的代码通过复制整个任务字典然后修改它并复制它来修复它
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
foo = d['task']
foo['processed_list'].append(ele)
d['task'] = foo
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
输出:
{'task': {'processed_list': ['one', 'two', 'three']}}
除了确保 d
在打印时确实包含某些内容外,结果仍然是 {'task': {'processed_list': ['one', 'two', 'three']}}
#!/usr/bin/python
from multiprocessing import Pool
def spammer_task(my_list):
#Initialize manager dict
out= {
'processed_list': []
}
for ele in my_list:
#process here
out['processed_list'].append(ele)
return 'task',out
my_list = ["one", "two", "three"]
if __name__=="__main__":
p = Pool()
d=dict(p.imap_unordered(spammer_task, [my_list])) #this line blocks until finished
print d
我有一个元素列表,我正在多处理 apply_async
任务中处理这些元素,并使用管理器字典中的一个键逐一更新处理的元素,我想在其上映射整个列表。
我尝试了以下代码:
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
d['task']['processed_list'].append(ele)
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
最后它只是在字典中发布空列表。 输出:
{'task': {'processed_list': []}}
现在,经过一番研究,我了解到 manager dict 中的元素变得不可变,因此您必须使用新数据重新初始化整个 dict 才能更新它。所以我尝试了下面的代码,但它给出了一个奇怪的错误。
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
old_list = d['task']['processed_list']
new_list = old_list.append(ele)
#Have to do it this way since elements inside a manager dict become
#immutable so
d['task'] = {
'processed_list': new_list
}
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
输出:
Traceback (most recent call last): File "./a.py", line 29, in p.apply_async(spammer_task (d, my_list)) File "./a.py", line 14, in spammer_task new_list = old_list.append(ele) AttributeError: 'NoneType' object has no attribute 'append'
它似乎以某种方式将 None
附加到列表中,但我不明白为什么。
根据 https://bugs.python.org/issue6766
的解决方案下面的代码通过复制整个任务字典然后修改它并复制它来修复它
#!/usr/bin/python
from multiprocessing import Pool, Manager
def spammer_task(d, my_list):
#Initialize manager dict
d['task'] = {
'processed_list': []
}
for ele in my_list:
#process here
foo = d['task']
foo['processed_list'].append(ele)
d['task'] = foo
return
p = Pool()
m = Manager()
d = m.dict()
my_list = ["one", "two", "three"]
p.apply_async(spammer_task (d, my_list))
print d
输出:
{'task': {'processed_list': ['one', 'two', 'three']}}
除了确保 d
在打印时确实包含某些内容外,结果仍然是 {'task': {'processed_list': ['one', 'two', 'three']}}
#!/usr/bin/python
from multiprocessing import Pool
def spammer_task(my_list):
#Initialize manager dict
out= {
'processed_list': []
}
for ele in my_list:
#process here
out['processed_list'].append(ele)
return 'task',out
my_list = ["one", "two", "three"]
if __name__=="__main__":
p = Pool()
d=dict(p.imap_unordered(spammer_task, [my_list])) #this line blocks until finished
print d