BrokenPipeError: [Errno 32] Python Multiprocessing

BrokenPipeError: [Errno 32] Python Multiprocessing

我正在做一个网络抓取项目,但是处理数据需要花费很多时间,我想出了一个替代途径来抓取被抓取产品的源代码,然后单独处理数据。

我所做的是,将每个产品的源代码分别存储在一个数组中的一个元组中,并将该数组数据保存在一个文本文件中,以供以后进一步处理。我将数据保存为 10,000 个产品的块。每个文本文件大约10GB。

当我开始使用 multiprocessing 处理数据时,我不断遇到 BrokenPipeError:[错误 32],最初我是在 windows 机器上处理数据,我探索了一下发现 Linux更擅长管理内存,这个错误是因为在处理过程中完全利用了内存。

最初,我将处理后的数据存储在一个数组中(没有为每个产品在 运行 时间保存数据),我在堆栈论坛上看到我需要保存处理后的数据,因为处理后的数据占用了所有内存,我相应地更改了代码,将 map 更改为 imap,虽然它 运行 更长但仍然出现相同的错误。

这是我的代码,我不会发布完整的处理步骤,因为它只会增加代码的长度。

需要注意的是每个产品在处理时都有大量的数组数据,每个单独的数组多达18000个元素。

我使用的是带有 16GB 内存和 500GB 固态硬盘的八核处理器。

如有任何帮助,我们将不胜感激。谢谢!

import xml.etree.cElementTree as ET
from lxml import html  
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
import pathos.multiprocessing as mp
import multiprocessing
import ast

global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]


def processData(data):

    vehicalData=[]
    oemData=[]
    appendIndex=0

    #geting product link form incoming data list (tupile)
    p=data[0][1]
    #geting html source code form incoming data list(tupile)
    #converting it to html element
    source_code=html.fromstring(data[0][0])

    #processing data
    try:
        firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
        firstOem=firstOem[0].text_content().strip()
    except:
        firstOem=''
    try:
        name=source_code.xpath("//div[@id='right_title']/h1")
        name=name[0].text_content().strip()
    except:
        name=''

    #saving data in respective arrays
    vehicalData.append([firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive])
    for q in dayQtyPrice:
        vehicalData[appendIndex].append(q)
    vehicalData[appendIndex].append(specString)
    vehicalData[appendIndex].append(subAssembltString)
    vehicalData[appendIndex].append(parentAssemblyString)
    vehicalData[appendIndex].append(otherProductString)
    vehicalData[appendIndex].append(description)
    vehicalData[appendIndex].append(placement)
    for dma in makeModelArray:
        vehicalData[appendIndex].append(dma)        
    oemData.append([firstOem,name,productType,brand,mfgNumber,p])   
    for o in oemArray:
        oemData[appendIndex].append(o)

    print('Done !',p,len(vehicalData[0]),len(oemData[0]))

    #returning both arrays
    return (vehicalData,oemData)

def main():
    productLinks=[]
    vehicalData=[]
    oemData=[]
    
    #opening text file for processing list data
    with open('test.txt', encoding='utf-8') as f:
        string=f.read()

    sourceDataList=ast.literal_eval(string)
    print('Number of products:',len(sourceDataList))

    #creating pool and initiating multiprocessing
    p = mp.Pool(4)  # Pool tells how many at a time

    #opening and saving data at run time
    vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
    vehicalOutSheet=vehicalOutBook.active
    oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
    oemOutSheet=oemOutBook.active
    for d in p.imap(processData, sourceDataList):
        v=d[0][0][:18000]
        o=d[1][0][:18000]
        vehicalOutSheet.append(v)
        oemOutSheet.append(o)

    p.terminate()
    p.join()

    #saving data
    vehicalOutBook.save('vehical_data_file.xlsx')
    oemOutBook.save('oem_data_file.xlsx')

if __name__=='__main__':
    main()

我不熟悉 pathos.multiprocessing.Pool class,但我们假设它的工作原理与 multiprocess.pool.Pool class 大致相同。问题是 test.txt 中的数据格式似乎必须读入整个文件才能用 ast.liter_eval 解析它,因此 [=15= 不能节省存储空间].

要有效地使用 imap(或 imap_unordered),而不是在文件 test.txt 中存储 list 的表示(JSON?),存储 多个 产品表示,由换行符分隔,可以单独解析,以便可以逐行读取和解析文件,而不是生成单个产品。您应该大致知道有多少行以及需要提交给 imap 的任务数量。这样做的原因是,当您有大量任务时,使用默认 chunksize 参数值 1 以外的值会更有效。我在下面包含了一个函数来沿着 map 函数将使用的行计算一个 chunksize 值。此外,您的工作函数 processData 似乎过多地使用了一层嵌套列表。我也恢复使用标准 multiprocessing.pool.Pool class 因为我或多或少知道它是如何工作的。

注意:我没有看到 processData 变量 makeModelArrayoemArray 的定义位置。

import xml.etree.cElementTree as ET
from lxml import html
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
#import pathos.multiprocessing as mp
import multiprocessing
import ast

global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]


def processData(data):

    #geting product link form incoming data list (tupile)
    p=data[0][1]
    #geting html source code form incoming data list(tupile)
    #converting it to html element
    source_code=html.fromstring(data[0][0])

    #processing data
    try:
        firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
        firstOem=firstOem[0].text_content().strip()
    except:
        firstOem=''
    try:
        name=source_code.xpath("//div[@id='right_title']/h1")
        name=name[0].text_content().strip()
    except:
        name=''

    #saving data in respective arrays
    vehicalData = [firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive]
    for q in dayQtyPrice:
        vehicalData,append(q)
    vehicalData,append(specString)
    vehicalData.append(subAssembltString)
    vehicalData.append(parentAssemblyString)
    vehicalData.append(otherProductString)
    vehicalData.append(description)
    vehicalData.append(placement)
    for dma in makeModelArray:
        vehicalData.append(dma)
    oemData = [firstOem,name,productType,brand,mfgNumber,p]
    for o in oemArray:
        oemData.append(o)

    #print('Done !',p,len(vehicalData),len(oemData))

    #returning both arrays
    return (vehicalData,oemData)

def generate_source_data_list():
    #opening text file for processing list data
    with open('test.txt', encoding='utf-8') as f:
        for line in f:
            # data for just one product:
            yield ast.literal_eval(line)

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize


def main():
    #creating pool and initiating multiprocessing
    # use pool size equal to number of cores you have:
    pool_size = multiprocessing.cpu_count()
    # Approximate number of elements generate_source_data_list() will yield:
    NUM_TASKS = 100_000 # replace with actual number
    p = multiprocessing.Pool(pool_size)
    chunksize = compute_chunksize(NUM_TASKS, pool_size)

    #opening and saving data at run time
    vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
    vehicalOutSheet=vehicalOutBook.active
    oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
    oemOutSheet=oemOutBook.active
    for d in p.imap(processData, generate_source_data_list(), chunksize=chunksize):
        v = d[0][:18000]
        o = d[1][:18000]
        vehicalOutSheet.append(v)
        oemOutSheet.append(o)

    p.terminate()
    p.join()

    #saving data
    vehicalOutBook.save('vehical_data_file.xlsx')
    oemOutBook.save('oem_data_file.xlsx')

if __name__=='__main__':
    main()

最终的电子表格仍需要大量存储空间!现在,如果您要输出两个 csv 文件,那就另当别论了——您可以边写边写。