使用 SAX 将大型 XML 文件划分为 Python 中的子文件

Partition large XML files into subfiles in Python using SAX

我需要解析非常大的XML文件(在3-5GB范围内),必须根据XML节点中包含的数据拆分成几个较小的XML文件.

每个输入文件都包含数十万 <measure> 个元素,就像这个(非常)简化的片段。

    <items>
        <measure code="0810">
            <condition sequ="001" SID="-5041162"/>
            <footnote Id="00550"/>
            <footnote Id="00735"/>
        </measure>
        <measure code="6304">
            <component Id="01" national="1"/>
            <footnote Id="00001"/>
        </measure>
        <measure code="0811">
            <condition sequ="002" SID="-5041356"/>
            <footnote Id="00555"/>
        </measure>
        <measure code="2915">
            <component Id="01" national="0"/>
            <certif SID="-737740"/>
            <certif SID="-737780"/>
        </measure>
    </items>

实际 <measure> 元素的内容几乎可以是任何格式正确的 XML。

我在解析这些文件时需要做两个过程:

  1. <measure>个元素的内容中提取信息,并转储 它到 MongoDB 数据库(这部分已解决...)
  2. 将原始 XML 文件分成 100 个 XML 个子文件 每个 <measure> 的 "code" 属性的前两位数字 节点。也就是说,新的 100 XML 个文件(命名为 'part_00.xml' 'part_99.xml') 需要创建并且每个 <measure> 元素必须是 附加到相应的子文件。 IE。 <measure> 示例中的块 1 和 3 应复制到 'part_08.xml',块 2 应复制到 'part_63.xml'...

我正在使用 SAX 来解析原始文件,上面的过程 1 运行良好。 SAX 进程的纯框架是:

    import sys
    from xml.sax import ContentHandler
    from xml.sax import make_parser

    class ParseMeasures(ContentHandler):
        code = ''

        def startElement(self, name, attrs):
            if name == 'measure':
                self.code = attrs.get('code')

        def endElement(self, name):
            if name == 'measure':
                print('***Must append <measure> block to file part_{0}.xml'.format(self.code[:2]))

    def main(args):
        handler = ParseMeasures()
        sax_parser = make_parser()
        sax_parser.setContentHandler(handler)
        sax_parser.parse('my_large_xml.file.xml')
        print('Ended')

    if __name__ == '__main__':
        main(sys.argv[1:])

我需要的是能够访问 'endElement()' 中的整个 <measure> XML 元素,以将其附加到相应的子文件。

有没有办法将 SAX 与其他 XML 解析功能相结合,从而获得 'endElement()' 中的整个 <measure> XML 元素? (我可以处理子文件的创建和管理......这不是问题!)

或者 SAX 方法在这种情况下不是最合适的,开始?

"only" 警告是进程应处理 3-5GB 范围内的输入文件...

以下是混合方案,使用内置SAX解析器生成解析事件,lxml构建部分树(仅<measure>个元素,一次仅一个)。

一旦构建了元素,它就会被 lxml 的 API for incremental XML generation 序列化到不同的文件中,具体取决于 @code.

的值

此代码处理 <measure> 元素内的任何级别的嵌套,以及包括空格在内的文本值。它目前不处理注释、处理指令或命名空间,但可以添加对这些的支持。

即使输入文件很大,内存消耗也应该保持在较低水平。 lxml会增加一些开销,但是支持迭代编写还是挺方便的。完全手动执行此操作总体上会更快,但也更复杂。

from xml.sax import ContentHandler, make_parser
from lxml import etree

class ParseMeasures(ContentHandler):
    def __init__(self):
        self.stack = []
        self.open = False
        self.elem = None
        self.writers = {}
        self.text = []

    def _get_writer(self, filename):
        with etree.xmlfile(filename) as xf:
            with xf.element('items'):
                while True:
                    el = (yield)
                    xf.write(el)
                    xf.flush()  # maybe don't flush *every* write

    def _write(self):
        grp = self.elem.attrib['code'][0:2]

        if grp in self.writers:
            writer = self.writers[grp]
        else:
            writer = self.writers[grp] = self._get_writer('part_%s.xml' % grp)
            next(writer)        # run up to `yield` and wait

        writer.send(self.elem)  # write out current `<measure>`
        self.elem = None

    def _add_text(self):
        if self.elem is not None and self.text:
            if self.open:
                self.elem.text = ''.join(self.text)
            else:
                self.elem.tail = ''.join(self.text)
            self.text = []

    def startElement(self, name, attrib):
        if self.stack or name == 'measure':
            self._add_text()
            self.open = True
            self.elem = etree.Element(name, attrib)
            self.stack.append(self.elem)
            if len(self.stack) > 1:
                self.stack[-2].append(self.elem)

    def characters(self, content):
        if self.elem is not None:
            self.text.append(content)

    def endElement(self, name):
        if self.stack:
            self._add_text()
            self.open = False
            self.elem = self.stack.pop()            
            if not self.stack:
                self._write()

    def endDocument(self):
        # clean up
        for writer in self.writers:
            self.writers[writer].close()


def main():
    sax_parser = make_parser()
    sax_parser.setContentHandler(ParseMeasures())
    sax_parser.parse(r'test.xml')

if __name__ == '__main__':
    main()

这会生成 part_08.xml

<items>
    <measure code="0810">
        <condition sequ="001" SID="-5041162"/>
        <footnote Id="00550"/>
        <footnote Id="00735"/>
    </measure>
    <measure code="0811">
        <condition sequ="002" SID="-5041356"/>
        <footnote Id="00555"/>
    </measure>
</items>

part_29.xml

<items>
    <measure code="2915">
        <component Id="01" national="0"/>
        <certif SID="-737740"/>
        <certif SID="-737780"/>
    </measure>
</items>

part_63.xml

<items>
    <measure code="6304">
        <component Id="01" national="1"/>
        <footnote Id="00001"/>
    </measure>
</items>

下面(输出是 3 个名称为 'part_zz.xml' 的文件)

请注意,下面的解决方案不使用任何外部库。

import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from xml.sax import ContentHandler
from xml.sax import make_parser


class ParseMeasures(ContentHandler):
    def __init__(self):
        self.data = defaultdict(list)
        self.code = None

    def startElement(self, name, attrs):
        if name == 'measure':
            self.code = attrs.get('code')[:2]
        if self.code:
            self.data[self.code].append((name, attrs._attrs))

    def endDocument(self):
        for k, v in self.data.items():
            root = None
            for entry in v:
                if entry[0] == 'measure':
                    if not root:
                        root = ET.Element('items')
                    measure = ET.SubElement(root, 'measure')
                temp = ET.SubElement(measure, entry[0])
                temp.attrib = entry[1]
            tree = ET.ElementTree(root)
            tree.write('part_{}.xml'.format(k), method='xml')


def main(args):
    handler = ParseMeasures()
    sax_parser = make_parser()
    sax_parser.setContentHandler(handler)
    sax_parser.parse('my_large_xml.file.xml')


if __name__ == '__main__':
    main(sys.argv[1:])

输出示例('part_08.xml')

<items>
    <measure>
        <measure code="0810"/>
        <condition SID="-5041162" sequ="001"/>
        <footnote Id="00550"/>
        <footnote Id="00735"/>
    </measure>
    <measure>
        <measure code="0811"/>
        <condition SID="-5041356" sequ="002"/>
        <footnote Id="00555"/>
    </measure>
</items>