如何使用 multiprocessing 解析和处理大型 XML 文件?

How to use multiprocessing for parsing and processing large XML files?

我有一个包含报纸文章的大型 XML 文件,我想通过使用多处理有效地标记这些文章。

XML 文件非常简单,如下所示:

<?xml version="1.0" encoding="utf-8"?>
<corpus>
    <text id="1">
        <body>
            <title>Some headline words</title>
            <p>A sentence. Another sentence.</p>
            <p>Third sentence.</p>
        </body>
    </text>
    <text id="2">
        <body>
            <title>Some other headline words</title>
            <p>A stupid sentence. Another stupid sentence.</p>
            <p>Third stupid sentence.</p>
        </body>
    </text>
    ...
</corpus>

到目前为止,我使用 ElementTree 中的 iterparse 解析文件。然而,由于任务的并行性如此之高,我考虑过额外使用多处理。所以我想保留 iterparse 的低内存消耗优势,但想将文本元素的处理拆分到多个处理器。顺序对我来说不重要。

我用下面的代码试过了,但是它退出了 TypeError: 'NoneType' object is not iterable:

def text_to_tokens(text_elem):
    text_id = text_elem.get("id")

    tokens = [tokenize(elem.text) for elem in text_elem.find("./body")]

    with open(f"{text_id}.txt", "w") as file:
        file.write(str(tokens))

def tokenize(string):
    return string.split(" ")


if __name__ == "__main__":

    with multiprocessing.Pool(processes=4) as pool:

        context = iter(ET.iterparse("corpus_file.xml", events=("start", "end")))
        event, root = next(context)

        for event, elem in context:
            if event == "end" and elem.tag == "text":
                pool.map(text_to_tokens, elem)
                root.clear()

如何让多处理工作?非常感谢任何其他提示或方法,我只想并行化此任务并使其尽可能快。

完整的退出代码如下所示:

multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "stack_overflow_test.py", line 15, in text_to_tokens
    tokens = [tokenize(elem.text) for elem in text_elem]
TypeError: 'NoneType' object is not iterable
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "stack_overflow_test.py", line 30, in <module>
    pool.map(text_to_tokens, elem)
  File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
TypeError: 'NoneType' object is not iterable

如果您在没有 with multiprocessing.Pool(processes=4) as pool: 的情况下尝试此代码会怎样?

有用吗?

如果不是,则问题出在您如何使用多处理之外。

如果是,请尝试在创建池之前完成迭代器的初始化。即:在初始化池之前放置以下内容。

    context = iter(ET.iterparse("corpus_file.xml", events=("start", "end")))
    event, root = next(context)

多处理泡菜对象,因此如果在创建池后初始化它可能无法正确传递。

单独的令牌提取和文件创建。使用 concurrent.futures 处理并发细节。

更改 text_to_tokens 以考虑 body 元素 return 数据而不是创建文件。

def text_to_tokens(text_elem):
    text_id = text_elem.get("id")
    fname = f"{text_id}.txt"
    tokens = []
    for elem in text_elem.iter():
        if elem.tag in ('text','body'):
            continue
        tokens.append(tokenize(elem.text))
    return fname,tokens

创建文件的新功能。

def write(fname, data):
    with open(fname,'w') as f:
        f.write(str(data))

在单独的进程中提取令牌并在线程中写入文件。

if __name__ == "__main__":
    context = iter(ET.iterparse(f, events=("start", "end")))
    event, root = next(context)
    token_futures = []
    write_futures = []
    with ProcPoolExc() as ppe, ThreadPoolExc() as tpe:
        for event, elem in context:
            if event == "end" and elem.tag == "text":
                token_futures.append(ppe.submit(text_to_tokens, elem))
        for future in concurrent.futures.as_completed(token_futures):
            fname,data = future.result()
            write_futures.append(tpe.submit(write, *(fname,data)))
    for fut in concurrent.futures.as_completed(write_futures):
        e = fut.exception()
        print('success' if not e else e)

设置:

import concurrent.futures, io
import xml.etree.ElementTree as ET

ProcPoolExc = concurrent.futures.ProcessPoolExecutor
ThreadPoolExc = concurrent.futures.ThreadPoolExecutor

s = '''<?xml version="1.0" encoding="utf-8"?>
<corpus>
    <text id="1">
        <body>
            <title>Some headline words</title>
            <p>A sentence. Another sentence.</p>
            <p>Third sentence.</p>
        </body>
    </text>
    <text id="2">
        <body>
            <title>Some other headline words</title>
            <p>A stupid sentence. Another stupid sentence.</p>
            <p>Third stupid sentence.</p>
        </body>
    </text>
    <text id="3">
        <body>
            <title>Some other headline words</title>
            <p>A stupid sentence. Another stupid sentence.</p>
            <p>Third stupid sentence.</p>
        </body>
    </text>
    <text id="4">
        <body>
            <title>Some other headline words</title>
            <p>A stupid sentence. Another stupid sentence.</p>
            <p>Third stupid sentence.</p>
        </body>
    </text>
</corpus>'''

f = io.StringIO(s)

def tokenize(string):
    return string.split(" ")