python 中的多进程仅使用一个进程
Multiprocess in python uses only one process
我正在尝试使用 python 学习多处理。
我写了一个简单的代码,应该从一个 txt 输入文件中为每个进程提供 1000 行。我的主函数读取一行,将其拆分,然后对字符串中的元素执行一些非常简单的操作。最终结果应该写入输出文件。
当我 运行 它时,正确生成了 4 个进程,但实际上只有一个进程是 运行 最小 CPU。结果,代码非常慢,违背了最初使用多处理的目的。
我想我没有像这个问题中那样的全局列表问题 (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU)。
我不明白我做错了什么,任何 help/suggestion 不胜感激。这是基本代码:
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
def grouper(n, iterable, padvalue=None):
return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
p = multiprocessing.Pool(4)
for chunk in grouper(1000, my_data):
results = p.map(myfunction, chunk)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
编辑
我按照建议修改了我的代码(删除冗余数据预处理)。但是,问题似乎仍然存在。
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
p = multiprocessing.Pool(4)
results = p.map(myfunction, chunk, chunksize=1000)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
根据你的代码片段,我想我会做这样的事情,将文件分成 8 个部分,然后让 4 个工人完成计算(为什么 8 个块和 4 个工人?只是我为示例所做的随机选择。) :
from multiprocessing import Pool
import itertools
def myfunction(lines):
returnlist = []
for line in lines:
list_of_elem = line.split(",")
elem_id = list_of_elem[1]
elem_to_check = list_of_elem[5]
ids = list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
returnlist.append(",".join(
[elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
return returnlist
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(itertools.islice(it, size)), ())
if __name__ == "__main__":
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
with Pool(4) as p:
res = p.map(myfunction, prep)
result = res.pop(0)
_ = list(map(lambda x: result.extend(x), res))
print(result) # ... or do something with the result
编辑:
这是假设您确信所有行的格式都相同并且不会导致错误。
根据您的评论,通过在不使用 multiprocessing
或使用 try/except 的情况下进行测试,查看您文件的 function/the 内容中的问题可能会很有用 large/ugly 几乎可以确定将产生输出的方法(异常 或结果):
def myfunction(lines):
returnlist = []
for line in lines:
try:
list_of_elem = line.split(",")
elem_id = list_of_elem[1]
elem_to_check = list_of_elem[5]
ids = list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
returnlist.append(",".join(
[elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
except Exception as err:
returnlist.append('I encountered error {} on line {}'.format(err, line))
return returnlist
我正在尝试使用 python 学习多处理。 我写了一个简单的代码,应该从一个 txt 输入文件中为每个进程提供 1000 行。我的主函数读取一行,将其拆分,然后对字符串中的元素执行一些非常简单的操作。最终结果应该写入输出文件。
当我 运行 它时,正确生成了 4 个进程,但实际上只有一个进程是 运行 最小 CPU。结果,代码非常慢,违背了最初使用多处理的目的。 我想我没有像这个问题中那样的全局列表问题 (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU)。
我不明白我做错了什么,任何 help/suggestion 不胜感激。这是基本代码:
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
def grouper(n, iterable, padvalue=None):
return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
p = multiprocessing.Pool(4)
for chunk in grouper(1000, my_data):
results = p.map(myfunction, chunk)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
编辑 我按照建议修改了我的代码(删除冗余数据预处理)。但是,问题似乎仍然存在。
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
p = multiprocessing.Pool(4)
results = p.map(myfunction, chunk, chunksize=1000)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
根据你的代码片段,我想我会做这样的事情,将文件分成 8 个部分,然后让 4 个工人完成计算(为什么 8 个块和 4 个工人?只是我为示例所做的随机选择。) :
from multiprocessing import Pool
import itertools
def myfunction(lines):
returnlist = []
for line in lines:
list_of_elem = line.split(",")
elem_id = list_of_elem[1]
elem_to_check = list_of_elem[5]
ids = list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
returnlist.append(",".join(
[elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
return returnlist
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(itertools.islice(it, size)), ())
if __name__ == "__main__":
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
with Pool(4) as p:
res = p.map(myfunction, prep)
result = res.pop(0)
_ = list(map(lambda x: result.extend(x), res))
print(result) # ... or do something with the result
编辑: 这是假设您确信所有行的格式都相同并且不会导致错误。
根据您的评论,通过在不使用 multiprocessing
或使用 try/except 的情况下进行测试,查看您文件的 function/the 内容中的问题可能会很有用 large/ugly 几乎可以确定将产生输出的方法(异常 或结果):
def myfunction(lines):
returnlist = []
for line in lines:
try:
list_of_elem = line.split(",")
elem_id = list_of_elem[1]
elem_to_check = list_of_elem[5]
ids = list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
returnlist.append(",".join(
[elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
except Exception as err:
returnlist.append('I encountered error {} on line {}'.format(err, line))
return returnlist