python 中的多进程仅使用一个进程

Multiprocess in python uses only one process

我正在尝试使用 python 学习多处理。 我写了一个简单的代码,应该从一个 txt 输入文件中为每个进程提供 1000 行。我的主函数读取一行,将其拆分,然后对字符串中的元素执行一些非常简单的操作。最终结果应该写入输出文件。

当我 运行 它时,正确生成了 4 个进程,但实际上只有一个进程是 运行 最小 CPU。结果,代码非常慢,违背了最初使用多处理的目的。 我想我没有像这个问题中那样的全局列表问题 (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU)。

我不明白我做错了什么,任何 help/suggestion 不胜感激。这是基本代码:

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

def grouper(n, iterable, padvalue=None):
    return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    p = multiprocessing.Pool(4)

    for chunk in grouper(1000, my_data):
            results = p.map(myfunction, chunk)
            for r in results:
                with open (r"my_output_file","ab") as outfile:
                   outfile.write(r)

编辑 我按照建议修改了我的代码(删除冗余数据预处理)。但是,问题似乎仍然存在。

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")

    p = multiprocessing.Pool(4)

    results = p.map(myfunction, chunk, chunksize=1000)
        for r in results:
            with open (r"my_output_file","ab") as outfile:
                outfile.write(r)

根据你的代码片段,我想我会做这样的事情,将文件分成 8 个部分,然后让 4 个工人完成计算(为什么 8 个块和 4 个工人?只是我为示例所做的随机选择。) :

from multiprocessing import Pool
import itertools

def myfunction(lines):
    returnlist = []
    for line in lines:
        list_of_elem = line.split(",")
        elem_id = list_of_elem[1]
        elem_to_check = list_of_elem[5]
        ids = list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
            returnlist.append(",".join(
                [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))

    return returnlist

def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(itertools.islice(it, size)), ())

if __name__ == "__main__":
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
    with Pool(4) as p:
        res = p.map(myfunction, prep)

    result = res.pop(0)
    _ = list(map(lambda x: result.extend(x), res))
    print(result)  # ... or do something with the result

编辑: 这是假设您确信所有行的格式都相同并且不会导致错误。

根据您的评论,通过在不使用 multiprocessing 或使用 try/except 的情况下进行测试,查看您文件的 function/the 内容中的问题可能会很有用 large/ugly 几乎可以确定将产生输出的方法(异常 或结果):

def myfunction(lines):
    returnlist = []
    for line in lines:
        try:
            list_of_elem = line.split(",")
            elem_id = list_of_elem[1]
            elem_to_check = list_of_elem[5]
            ids = list_of_elem[2].split("|")

            for x in itertools.permutations(ids,2):
                returnlist.append(",".join(
                    [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
        except Exception as err:
            returnlist.append('I encountered error {} on line {}'.format(err, line))

    return returnlist