在 python 多处理中改变共享对象

Mutate a shared object in python Multiprocessing

假设有一个 dummy.txt 文件包含以下信息:

a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10

(数值只有102030)和以下代码:

import multiprocessing 

global conns 

class Line():
    def __init__(self, text, group) -> None:
        self.text = text 
        self.group = int(group)

    def get_link(self):
        return self.text.split('_from')[0]

    def __repr__(self):
        return f"<{self.group},{self.text}>"

class Groups():

    def __init__(self, name ) -> None: 
        self.name = name 
        self.groups = {k:set() for k in [10,20,30]}

    def add_to_dict(self,line : Line):
        
        connection = line.get_link()
        if connection not in self.groups.keys():
            self.groups[connection] = set()
        
        self.groups[connection].add(line.text)

def thread_f(item : Line):

    # Update the dictionary of every Group object accordingly
    global conns
 
    key = item.get_link()
     
    conns[key].add_to_dict(item)
    
def main():

    global conns 

    # Parse the file and store the information in an iterable
    with open('dummy.txt') as f:

        info = [ Line(*line.strip().split()) for line in f]

    # Update the global (shared) object and initialize a dictionary 
    # this has the following initialization: 
    # { a_to_b : set(), c_to_f : set() }
    conns = { k : Groups(k) for k in {x.get_link() for x in info} }

    # Update the shared object according to the iterable information
    with multiprocessing.Pool(5) as pool:

        res = pool.map(thread_f,     # add to the appropriate key the items 
                        info,        # the lines 
                        chunksize=1) # respect the order

    # Display the Results        
    for group_name, group_obj in conns.items():

        print(f"Grp Name {group_name} has the following:")

        for cost, connections in group_obj.groups.items():

            print(cost,connections)
        

if __name__ == "__main__":
    main()

我想做的是首先解析文件并为文件的每一行生成一个 Line 对象。解析完成后,我更新了全局变量 conns,我打算将其用作池中所有工作人员的共享变量。然后在 thread_f 函数中,我通过将相应的 Line.

添加到适当的 Group 对象的字典字段来更新全局变量(字典)

问题是当我尝试显示信息时,什么也没有显示。相反,我得到了一组空集:

Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()

相反,我期待的是:

Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)

因为,python 多处理实际上是一种 fork 方法,我知道子进程确实可以访问父进程已经初始化的信息,但它们的更改对父进程没有影响。在阅读文档并在 S.O 中搜索后。我发现了 multiprocessing 包的 Manager 个对象。问题是,我无法生成已经初始化的 Manager.dict()(就像我在 conns 理解的情况下所做的那样)。

我怎样才能实现上述期望的行为?

是的,但为什么要多处理?

好吧,这个例子只是我创建的一个 MWE 来模仿我的实际代码的作用。事实上,我正在尝试加速那些不能很好地适应非常大的输入文件的代码。

关于 Manager

这里有一个稍微类似的问题 [1],我没有设法找到一种初始化方法或 Manager.dict() 从预先存在的,即已经初始化的字典传递到产生的进程。因此,我使用 sets 来保证不会有重复的条目,并且 global 已经初始化的变量将由进程持续更新。

提议的 Manager 方法的解决方法?

好吧,由于计算工作量随着共享资源(即 Manager.dict 对象)的使用而增加,是否可以采用以下解决方法?

想法是:

因此,假设我们的工作进程池总共包含 X 个进程。每个工人的任务是将每个 Line 分类到适当的 GroupLines 作为 list[Line] 对象提供给工人。因此,如果我们使用像这样的分而治之的方法会怎样:

+--------------------------------------------------------------+
|                        list[Line]                            |
+--------------------------------------------------------------+
|        |        |        |        |        |        |        |
|        |        |        |        |        |        |        |
  <-d->    <-d->    <-d->     ...                       <-d->

列表分为 X 个独立的 chunks/slices/sublists,大小为 len(list)/X。然后,将这些可迭代对象中的每一个交给工作人员进行处理。但是现在 thread_f 功能必须相应地修改。

它将:

pool procs 完成后,结果即字典必须合并为一个最终解决方案。

首先,我认为你的方法有误Groups.add_to_dict。我已经把错误的语句注释掉,并在后面加上正确的语句:

import multiprocessing

def init_processes(d, the_lock):
    global conns, lock
    conns, lock = d, the_lock

class Line():
    def __init__(self, text, group) -> None:
        self.text = text
        self.group = int(group)

    def get_link(self):
        return self.text.split('_from')[0]

    def __repr__(self):
        return f"<{self.group},{self.text}>"

class Groups():

    def __init__(self, name ) -> None:
        self.name = name
        self.groups = {k:set() for k in [10,20,30]}

    def add_to_dict(self,line : Line):

        #connection = line.get_link()
        connection = line.group
        if connection not in self.groups.keys():
            self.groups[connection] = set()

        self.groups[connection].add(line.text)


def thread_f(item : Line):

    # Update the dictionary of every Group object accordingly
    global conns # Not strictly necessary

    key = item.get_link()

    # We need to let the managed dict know there is an updated value for the key:
    """
    conns[key].add_to_dict(item)
    """
    with lock:
        the_set = conns[key]
        the_set.add_to_dict(item)
        conns[key] = the_set # reset the reference

def main():

    # Parse the file and store the information in an iterable
    with open('dummy.txt') as f:

        info = [ Line(*line.strip().split()) for line in f]

    # Update the global (shared) object and initialize a dictionary
    # this has the following initialization:
    # { a_to_b : set(), c_to_f : set() }
    conns = multiprocessing.Manager().dict(
        { k : Groups(k) for k in {x.get_link() for x in info} }
    )

    # Update the shared object according to the iterable information
    lock = multiprocessing.Lock()
    with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:

        res = pool.map(thread_f,     # add to the appropriate key the items
                        info,        # the lines
                        chunksize=1) # respect the order

    # Display the Results
    for group_name, group_obj in conns.items():

        print(f"Grp Name {group_name} has the following:")

        for cost, connections in group_obj.groups.items():

            print(cost,connections)


if __name__ == "__main__":
    main()

打印:

Grp Name c_to_f has the following:
10 {'c_to_f_from_l'}
20 {'c_to_f_from_k'}
30 {'c_to_f_from_e'}
Grp Name a_to_b has the following:
10 {'a_to_b_from_l'}
20 {'a_to_b_from_k', 'a_to_b_from_c'}
30 {'a_to_b_from_w'}

更新

我在这里可能有失偏颇,但我看到的大部分工作似乎都在输入行的解析中。在您的真实情况下,无论是什么,它可能只占您总处理的一小部分(如果不是,那么,正如我之前在评论中提到的那样,多处理不适合这个问题),但我明白了没有理由不将该处理移动到多处理池本身。

我极大地重构了代码,将行解析移至 Line class 并且自合并以来不再需要 Groups class词典和集合的数量由主进程完成。

我使用方法 imap_unordered 而不是 imap 因为它通常稍微更有效而且你之前的编码没有使用 [=19= 中的 return 值] 方法,不依赖于生成结果的顺序。因此键可以以任意顺序添加到字典中。为什么要从字典中的顺序开始呢?

您应该知道,如果您的输入文件有很多行 并且 您的工作函数需要 non-trivial 处理,您可以比多处理任务队列更快地填充进程可以清空它,你可能会耗尽内存。我确实有解决方案,但那是另一回事了。

import multiprocessing


class ProcessedLine():
    def __init__(self, text : str) -> None:
        self.text, group = text.strip().split()
        self.group = int(group)
        self.link = text.split('_from')[0]
        self.dict = {self.link: {self.group: set([self.text])}}

def process_line(text : str):
    processed_line = ProcessedLine(text)
    return processed_line

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

def main():

    def generate_lines():
        with open('dummy.txt') as f:
            for line in f:
                yield line

    ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
    POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
    # chunksize to be used with imap_unordered:
    chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
    pool = multiprocessing.Pool(POOL_SIZE)
    # Specify a chunksize value if the size of the iterable is large
    results = {}
    for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
        link = processed_line.link
        if link not in results:
            # Just update with the entire dictionary
            results.update(processed_line.dict)
        else:
            # Update the set dictionary:
            set_dict = results[link]
            set_key = processed_line.group
            if set_key in set_dict:
                set_dict[set_key].add(processed_line.text)
            else:
                #set_dict[set_key] = set(processed_line.text)
                set_dict[set_key] = processed_line.dict[link][set_key]
    pool.close()
    pool.join()

    for group_name, groups in results.items():
        print(f'Group Name {group_name} has the following:')
        for k, v in groups.items():
            print('   ', k, '->', v)
        print()

if __name__ == "__main__":
    main()

打印:

Group Name a_to_b has the following:
    20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
    30 -> {'a_to_b_from_w'}
    10 -> {'a_to_b_from_l'}

Group Name c_to_f has the following:
    30 -> {'c_to_f_from_e'}
    20 -> {'c_to_f_from_k'}
    10 -> {'c_to_f_from_l'}