在 python 多处理中改变共享对象
Mutate a shared object in python Multiprocessing
假设有一个 dummy.txt
文件包含以下信息:
a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10
(数值只有10
、20
、30
)和以下代码:
import multiprocessing
global conns
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
connection = line.get_link()
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns
key = item.get_link()
conns[key].add_to_dict(item)
def main():
global conns
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = { k : Groups(k) for k in {x.get_link() for x in info} }
# Update the shared object according to the iterable information
with multiprocessing.Pool(5) as pool:
res = pool.map(thread_f, # add to the appropriate key the items
info, # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)
if __name__ == "__main__":
main()
我想做的是首先解析文件并为文件的每一行生成一个 Line
对象。解析完成后,我更新了全局变量 conns
,我打算将其用作池中所有工作人员的共享变量。然后在 thread_f
函数中,我通过将相应的 Line
.
添加到适当的 Group
对象的字典字段来更新全局变量(字典)
问题是当我尝试显示信息时,什么也没有显示。相反,我得到了一组空集:
Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()
相反,我期待的是:
Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)
因为,python 多处理实际上是一种 fork
方法,我知道子进程确实可以访问父进程已经初始化的信息,但它们的更改对父进程没有影响。在阅读文档并在 S.O 中搜索后。我发现了 multiprocessing 包的 Manager
个对象。问题是,我无法生成已经初始化的 Manager.dict()
(就像我在 conns
理解的情况下所做的那样)。
我怎样才能实现上述期望的行为?
是的,但为什么要多处理?
好吧,这个例子只是我创建的一个 MWE 来模仿我的实际代码的作用。事实上,我正在尝试加速那些不能很好地适应非常大的输入文件的代码。
关于 Manager
这里有一个稍微类似的问题 [1],我没有设法找到一种初始化方法或 Manager.dict()
从预先存在的,即已经初始化的字典传递到产生的进程。因此,我使用 sets
来保证不会有重复的条目,并且 global
已经初始化的变量将由进程持续更新。
提议的 Manager
方法的解决方法?
好吧,由于计算工作量随着共享资源(即 Manager.dict
对象)的使用而增加,是否可以采用以下解决方法?
想法是:
- 避免在进程间使用共享资源
- 因此,避免竞争条件
因此,假设我们的工作进程池总共包含 X
个进程。每个工人的任务是将每个 Line
分类到适当的 Group
。 Lines
作为 list[Line]
对象提供给工人。因此,如果我们使用像这样的分而治之的方法会怎样:
+--------------------------------------------------------------+
| list[Line] |
+--------------------------------------------------------------+
| | | | | | | |
| | | | | | | |
<-d-> <-d-> <-d-> ... <-d->
列表分为 X
个独立的 chunks/slices/sublists,大小为 len(list)/X
。然后,将这些可迭代对象中的每一个交给工作人员进行处理。但是现在 thread_f
功能必须相应地修改。
它将:
- 生成
Groups
的字典 where key = line.group
and value = Group
object
- 根据他给定的 chunk/slice/sublist 的
Line
个对象填充这个字典。
- Return字典
pool
procs 完成后,结果即字典必须合并为一个最终解决方案。
首先,我认为你的方法有误Groups.add_to_dict
。我已经把错误的语句注释掉,并在后面加上正确的语句:
import multiprocessing
def init_processes(d, the_lock):
global conns, lock
conns, lock = d, the_lock
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
#connection = line.get_link()
connection = line.group
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns # Not strictly necessary
key = item.get_link()
# We need to let the managed dict know there is an updated value for the key:
"""
conns[key].add_to_dict(item)
"""
with lock:
the_set = conns[key]
the_set.add_to_dict(item)
conns[key] = the_set # reset the reference
def main():
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = multiprocessing.Manager().dict(
{ k : Groups(k) for k in {x.get_link() for x in info} }
)
# Update the shared object according to the iterable information
lock = multiprocessing.Lock()
with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:
res = pool.map(thread_f, # add to the appropriate key the items
info, # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)
if __name__ == "__main__":
main()
打印:
Grp Name c_to_f has the following:
10 {'c_to_f_from_l'}
20 {'c_to_f_from_k'}
30 {'c_to_f_from_e'}
Grp Name a_to_b has the following:
10 {'a_to_b_from_l'}
20 {'a_to_b_from_k', 'a_to_b_from_c'}
30 {'a_to_b_from_w'}
更新
我在这里可能有失偏颇,但我看到的大部分工作似乎都在输入行的解析中。在您的真实情况下,无论是什么,它可能只占您总处理的一小部分(如果不是,那么,正如我之前在评论中提到的那样,多处理不适合这个问题),但我明白了没有理由不将该处理移动到多处理池本身。
我极大地重构了代码,将行解析移至 Line
class 并且自合并以来不再需要 Groups
class词典和集合的数量由主进程完成。
我使用方法 imap_unordered
而不是 imap
因为它通常稍微更有效而且你之前的编码没有使用 [=19= 中的 return 值] 方法,不依赖于生成结果的顺序。因此键可以以任意顺序添加到字典中。为什么要从字典中的顺序开始呢?
您应该知道,如果您的输入文件有很多行 并且 您的工作函数需要 non-trivial 处理,您可以比多处理任务队列更快地填充进程可以清空它,你可能会耗尽内存。我确实有解决方案,但那是另一回事了。
import multiprocessing
class ProcessedLine():
def __init__(self, text : str) -> None:
self.text, group = text.strip().split()
self.group = int(group)
self.link = text.split('_from')[0]
self.dict = {self.link: {self.group: set([self.text])}}
def process_line(text : str):
processed_line = ProcessedLine(text)
return processed_line
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def main():
def generate_lines():
with open('dummy.txt') as f:
for line in f:
yield line
ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
# chunksize to be used with imap_unordered:
chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
pool = multiprocessing.Pool(POOL_SIZE)
# Specify a chunksize value if the size of the iterable is large
results = {}
for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
link = processed_line.link
if link not in results:
# Just update with the entire dictionary
results.update(processed_line.dict)
else:
# Update the set dictionary:
set_dict = results[link]
set_key = processed_line.group
if set_key in set_dict:
set_dict[set_key].add(processed_line.text)
else:
#set_dict[set_key] = set(processed_line.text)
set_dict[set_key] = processed_line.dict[link][set_key]
pool.close()
pool.join()
for group_name, groups in results.items():
print(f'Group Name {group_name} has the following:')
for k, v in groups.items():
print(' ', k, '->', v)
print()
if __name__ == "__main__":
main()
打印:
Group Name a_to_b has the following:
20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
30 -> {'a_to_b_from_w'}
10 -> {'a_to_b_from_l'}
Group Name c_to_f has the following:
30 -> {'c_to_f_from_e'}
20 -> {'c_to_f_from_k'}
10 -> {'c_to_f_from_l'}
假设有一个 dummy.txt
文件包含以下信息:
a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10
(数值只有10
、20
、30
)和以下代码:
import multiprocessing
global conns
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
connection = line.get_link()
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns
key = item.get_link()
conns[key].add_to_dict(item)
def main():
global conns
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = { k : Groups(k) for k in {x.get_link() for x in info} }
# Update the shared object according to the iterable information
with multiprocessing.Pool(5) as pool:
res = pool.map(thread_f, # add to the appropriate key the items
info, # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)
if __name__ == "__main__":
main()
我想做的是首先解析文件并为文件的每一行生成一个 Line
对象。解析完成后,我更新了全局变量 conns
,我打算将其用作池中所有工作人员的共享变量。然后在 thread_f
函数中,我通过将相应的 Line
.
Group
对象的字典字段来更新全局变量(字典)
问题是当我尝试显示信息时,什么也没有显示。相反,我得到了一组空集:
Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()
相反,我期待的是:
Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)
因为,python 多处理实际上是一种 fork
方法,我知道子进程确实可以访问父进程已经初始化的信息,但它们的更改对父进程没有影响。在阅读文档并在 S.O 中搜索后。我发现了 multiprocessing 包的 Manager
个对象。问题是,我无法生成已经初始化的 Manager.dict()
(就像我在 conns
理解的情况下所做的那样)。
我怎样才能实现上述期望的行为?
是的,但为什么要多处理?
好吧,这个例子只是我创建的一个 MWE 来模仿我的实际代码的作用。事实上,我正在尝试加速那些不能很好地适应非常大的输入文件的代码。
关于 Manager
这里有一个稍微类似的问题 [1],我没有设法找到一种初始化方法或 Manager.dict()
从预先存在的,即已经初始化的字典传递到产生的进程。因此,我使用 sets
来保证不会有重复的条目,并且 global
已经初始化的变量将由进程持续更新。
提议的 Manager
方法的解决方法?
好吧,由于计算工作量随着共享资源(即 Manager.dict
对象)的使用而增加,是否可以采用以下解决方法?
想法是:
- 避免在进程间使用共享资源
- 因此,避免竞争条件
因此,假设我们的工作进程池总共包含 X
个进程。每个工人的任务是将每个 Line
分类到适当的 Group
。 Lines
作为 list[Line]
对象提供给工人。因此,如果我们使用像这样的分而治之的方法会怎样:
+--------------------------------------------------------------+
| list[Line] |
+--------------------------------------------------------------+
| | | | | | | |
| | | | | | | |
<-d-> <-d-> <-d-> ... <-d->
列表分为 X
个独立的 chunks/slices/sublists,大小为 len(list)/X
。然后,将这些可迭代对象中的每一个交给工作人员进行处理。但是现在 thread_f
功能必须相应地修改。
它将:
- 生成
Groups
的字典 wherekey = line.group
and value =Group
object - 根据他给定的 chunk/slice/sublist 的
Line
个对象填充这个字典。 - Return字典
pool
procs 完成后,结果即字典必须合并为一个最终解决方案。
首先,我认为你的方法有误Groups.add_to_dict
。我已经把错误的语句注释掉,并在后面加上正确的语句:
import multiprocessing
def init_processes(d, the_lock):
global conns, lock
conns, lock = d, the_lock
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
#connection = line.get_link()
connection = line.group
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns # Not strictly necessary
key = item.get_link()
# We need to let the managed dict know there is an updated value for the key:
"""
conns[key].add_to_dict(item)
"""
with lock:
the_set = conns[key]
the_set.add_to_dict(item)
conns[key] = the_set # reset the reference
def main():
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = multiprocessing.Manager().dict(
{ k : Groups(k) for k in {x.get_link() for x in info} }
)
# Update the shared object according to the iterable information
lock = multiprocessing.Lock()
with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:
res = pool.map(thread_f, # add to the appropriate key the items
info, # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)
if __name__ == "__main__":
main()
打印:
Grp Name c_to_f has the following:
10 {'c_to_f_from_l'}
20 {'c_to_f_from_k'}
30 {'c_to_f_from_e'}
Grp Name a_to_b has the following:
10 {'a_to_b_from_l'}
20 {'a_to_b_from_k', 'a_to_b_from_c'}
30 {'a_to_b_from_w'}
更新
我在这里可能有失偏颇,但我看到的大部分工作似乎都在输入行的解析中。在您的真实情况下,无论是什么,它可能只占您总处理的一小部分(如果不是,那么,正如我之前在评论中提到的那样,多处理不适合这个问题),但我明白了没有理由不将该处理移动到多处理池本身。
我极大地重构了代码,将行解析移至 Line
class 并且自合并以来不再需要 Groups
class词典和集合的数量由主进程完成。
我使用方法 imap_unordered
而不是 imap
因为它通常稍微更有效而且你之前的编码没有使用 [=19= 中的 return 值] 方法,不依赖于生成结果的顺序。因此键可以以任意顺序添加到字典中。为什么要从字典中的顺序开始呢?
您应该知道,如果您的输入文件有很多行 并且 您的工作函数需要 non-trivial 处理,您可以比多处理任务队列更快地填充进程可以清空它,你可能会耗尽内存。我确实有解决方案,但那是另一回事了。
import multiprocessing
class ProcessedLine():
def __init__(self, text : str) -> None:
self.text, group = text.strip().split()
self.group = int(group)
self.link = text.split('_from')[0]
self.dict = {self.link: {self.group: set([self.text])}}
def process_line(text : str):
processed_line = ProcessedLine(text)
return processed_line
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def main():
def generate_lines():
with open('dummy.txt') as f:
for line in f:
yield line
ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
# chunksize to be used with imap_unordered:
chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
pool = multiprocessing.Pool(POOL_SIZE)
# Specify a chunksize value if the size of the iterable is large
results = {}
for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
link = processed_line.link
if link not in results:
# Just update with the entire dictionary
results.update(processed_line.dict)
else:
# Update the set dictionary:
set_dict = results[link]
set_key = processed_line.group
if set_key in set_dict:
set_dict[set_key].add(processed_line.text)
else:
#set_dict[set_key] = set(processed_line.text)
set_dict[set_key] = processed_line.dict[link][set_key]
pool.close()
pool.join()
for group_name, groups in results.items():
print(f'Group Name {group_name} has the following:')
for k, v in groups.items():
print(' ', k, '->', v)
print()
if __name__ == "__main__":
main()
打印:
Group Name a_to_b has the following:
20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
30 -> {'a_to_b_from_w'}
10 -> {'a_to_b_from_l'}
Group Name c_to_f has the following:
30 -> {'c_to_f_from_e'}
20 -> {'c_to_f_from_k'}
10 -> {'c_to_f_from_l'}