使用 SAX 将大型 XML 文件划分为 Python 中的子文件
Partition large XML files into subfiles in Python using SAX
我需要解析非常大的XML文件(在3-5GB范围内),必须根据XML节点中包含的数据拆分成几个较小的XML文件.
每个输入文件都包含数十万 <measure>
个元素,就像这个(非常)简化的片段。
<items>
<measure code="0810">
<condition sequ="001" SID="-5041162"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure code="6304">
<component Id="01" national="1"/>
<footnote Id="00001"/>
</measure>
<measure code="0811">
<condition sequ="002" SID="-5041356"/>
<footnote Id="00555"/>
</measure>
<measure code="2915">
<component Id="01" national="0"/>
<certif SID="-737740"/>
<certif SID="-737780"/>
</measure>
</items>
实际 <measure>
元素的内容几乎可以是任何格式正确的 XML。
我在解析这些文件时需要做两个过程:
- 从
<measure>
个元素的内容中提取信息,并转储
它到 MongoDB 数据库(这部分已解决...)
- 将原始 XML 文件分成 100 个 XML 个子文件
每个
<measure>
的 "code" 属性的前两位数字
节点。也就是说,新的 100 XML 个文件(命名为 'part_00.xml'
'part_99.xml') 需要创建并且每个 <measure>
元素必须是
附加到相应的子文件。
IE。 <measure>
示例中的块 1 和 3 应复制到 'part_08.xml',块 2 应复制到 'part_63.xml'...
我正在使用 SAX 来解析原始文件,上面的过程 1 运行良好。
SAX 进程的纯框架是:
import sys
from xml.sax import ContentHandler
from xml.sax import make_parser
class ParseMeasures(ContentHandler):
code = ''
def startElement(self, name, attrs):
if name == 'measure':
self.code = attrs.get('code')
def endElement(self, name):
if name == 'measure':
print('***Must append <measure> block to file part_{0}.xml'.format(self.code[:2]))
def main(args):
handler = ParseMeasures()
sax_parser = make_parser()
sax_parser.setContentHandler(handler)
sax_parser.parse('my_large_xml.file.xml')
print('Ended')
if __name__ == '__main__':
main(sys.argv[1:])
我需要的是能够访问 'endElement()' 中的整个 <measure>
XML 元素,以将其附加到相应的子文件。
有没有办法将 SAX 与其他 XML 解析功能相结合,从而获得 'endElement()' 中的整个 <measure>
XML 元素? (我可以处理子文件的创建和管理......这不是问题!)
或者 SAX 方法在这种情况下不是最合适的,开始?
"only" 警告是进程应处理 3-5GB 范围内的输入文件...
以下是混合方案,使用内置SAX解析器生成解析事件,lxml构建部分树(仅<measure>
个元素,一次仅一个)。
一旦构建了元素,它就会被 lxml 的 API for incremental XML generation 序列化到不同的文件中,具体取决于 @code
.
的值
此代码处理 <measure>
元素内的任何级别的嵌套,以及包括空格在内的文本值。它目前不处理注释、处理指令或命名空间,但可以添加对这些的支持。
即使输入文件很大,内存消耗也应该保持在较低水平。 lxml会增加一些开销,但是支持迭代编写还是挺方便的。完全手动执行此操作总体上会更快,但也更复杂。
from xml.sax import ContentHandler, make_parser
from lxml import etree
class ParseMeasures(ContentHandler):
def __init__(self):
self.stack = []
self.open = False
self.elem = None
self.writers = {}
self.text = []
def _get_writer(self, filename):
with etree.xmlfile(filename) as xf:
with xf.element('items'):
while True:
el = (yield)
xf.write(el)
xf.flush() # maybe don't flush *every* write
def _write(self):
grp = self.elem.attrib['code'][0:2]
if grp in self.writers:
writer = self.writers[grp]
else:
writer = self.writers[grp] = self._get_writer('part_%s.xml' % grp)
next(writer) # run up to `yield` and wait
writer.send(self.elem) # write out current `<measure>`
self.elem = None
def _add_text(self):
if self.elem is not None and self.text:
if self.open:
self.elem.text = ''.join(self.text)
else:
self.elem.tail = ''.join(self.text)
self.text = []
def startElement(self, name, attrib):
if self.stack or name == 'measure':
self._add_text()
self.open = True
self.elem = etree.Element(name, attrib)
self.stack.append(self.elem)
if len(self.stack) > 1:
self.stack[-2].append(self.elem)
def characters(self, content):
if self.elem is not None:
self.text.append(content)
def endElement(self, name):
if self.stack:
self._add_text()
self.open = False
self.elem = self.stack.pop()
if not self.stack:
self._write()
def endDocument(self):
# clean up
for writer in self.writers:
self.writers[writer].close()
def main():
sax_parser = make_parser()
sax_parser.setContentHandler(ParseMeasures())
sax_parser.parse(r'test.xml')
if __name__ == '__main__':
main()
这会生成 part_08.xml
<items>
<measure code="0810">
<condition sequ="001" SID="-5041162"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure code="0811">
<condition sequ="002" SID="-5041356"/>
<footnote Id="00555"/>
</measure>
</items>
和part_29.xml
<items>
<measure code="2915">
<component Id="01" national="0"/>
<certif SID="-737740"/>
<certif SID="-737780"/>
</measure>
</items>
和part_63.xml
<items>
<measure code="6304">
<component Id="01" national="1"/>
<footnote Id="00001"/>
</measure>
</items>
下面(输出是 3 个名称为 'part_zz.xml' 的文件)
请注意,下面的解决方案不使用任何外部库。
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from xml.sax import ContentHandler
from xml.sax import make_parser
class ParseMeasures(ContentHandler):
def __init__(self):
self.data = defaultdict(list)
self.code = None
def startElement(self, name, attrs):
if name == 'measure':
self.code = attrs.get('code')[:2]
if self.code:
self.data[self.code].append((name, attrs._attrs))
def endDocument(self):
for k, v in self.data.items():
root = None
for entry in v:
if entry[0] == 'measure':
if not root:
root = ET.Element('items')
measure = ET.SubElement(root, 'measure')
temp = ET.SubElement(measure, entry[0])
temp.attrib = entry[1]
tree = ET.ElementTree(root)
tree.write('part_{}.xml'.format(k), method='xml')
def main(args):
handler = ParseMeasures()
sax_parser = make_parser()
sax_parser.setContentHandler(handler)
sax_parser.parse('my_large_xml.file.xml')
if __name__ == '__main__':
main(sys.argv[1:])
输出示例('part_08.xml')
<items>
<measure>
<measure code="0810"/>
<condition SID="-5041162" sequ="001"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure>
<measure code="0811"/>
<condition SID="-5041356" sequ="002"/>
<footnote Id="00555"/>
</measure>
</items>
我需要解析非常大的XML文件(在3-5GB范围内),必须根据XML节点中包含的数据拆分成几个较小的XML文件.
每个输入文件都包含数十万 <measure>
个元素,就像这个(非常)简化的片段。
<items>
<measure code="0810">
<condition sequ="001" SID="-5041162"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure code="6304">
<component Id="01" national="1"/>
<footnote Id="00001"/>
</measure>
<measure code="0811">
<condition sequ="002" SID="-5041356"/>
<footnote Id="00555"/>
</measure>
<measure code="2915">
<component Id="01" national="0"/>
<certif SID="-737740"/>
<certif SID="-737780"/>
</measure>
</items>
实际 <measure>
元素的内容几乎可以是任何格式正确的 XML。
我在解析这些文件时需要做两个过程:
- 从
<measure>
个元素的内容中提取信息,并转储 它到 MongoDB 数据库(这部分已解决...) - 将原始 XML 文件分成 100 个 XML 个子文件
每个
<measure>
的 "code" 属性的前两位数字 节点。也就是说,新的 100 XML 个文件(命名为 'part_00.xml' 'part_99.xml') 需要创建并且每个<measure>
元素必须是 附加到相应的子文件。 IE。<measure>
示例中的块 1 和 3 应复制到 'part_08.xml',块 2 应复制到 'part_63.xml'...
我正在使用 SAX 来解析原始文件,上面的过程 1 运行良好。 SAX 进程的纯框架是:
import sys
from xml.sax import ContentHandler
from xml.sax import make_parser
class ParseMeasures(ContentHandler):
code = ''
def startElement(self, name, attrs):
if name == 'measure':
self.code = attrs.get('code')
def endElement(self, name):
if name == 'measure':
print('***Must append <measure> block to file part_{0}.xml'.format(self.code[:2]))
def main(args):
handler = ParseMeasures()
sax_parser = make_parser()
sax_parser.setContentHandler(handler)
sax_parser.parse('my_large_xml.file.xml')
print('Ended')
if __name__ == '__main__':
main(sys.argv[1:])
我需要的是能够访问 'endElement()' 中的整个 <measure>
XML 元素,以将其附加到相应的子文件。
有没有办法将 SAX 与其他 XML 解析功能相结合,从而获得 'endElement()' 中的整个 <measure>
XML 元素? (我可以处理子文件的创建和管理......这不是问题!)
或者 SAX 方法在这种情况下不是最合适的,开始?
"only" 警告是进程应处理 3-5GB 范围内的输入文件...
以下是混合方案,使用内置SAX解析器生成解析事件,lxml构建部分树(仅<measure>
个元素,一次仅一个)。
一旦构建了元素,它就会被 lxml 的 API for incremental XML generation 序列化到不同的文件中,具体取决于 @code
.
此代码处理 <measure>
元素内的任何级别的嵌套,以及包括空格在内的文本值。它目前不处理注释、处理指令或命名空间,但可以添加对这些的支持。
即使输入文件很大,内存消耗也应该保持在较低水平。 lxml会增加一些开销,但是支持迭代编写还是挺方便的。完全手动执行此操作总体上会更快,但也更复杂。
from xml.sax import ContentHandler, make_parser
from lxml import etree
class ParseMeasures(ContentHandler):
def __init__(self):
self.stack = []
self.open = False
self.elem = None
self.writers = {}
self.text = []
def _get_writer(self, filename):
with etree.xmlfile(filename) as xf:
with xf.element('items'):
while True:
el = (yield)
xf.write(el)
xf.flush() # maybe don't flush *every* write
def _write(self):
grp = self.elem.attrib['code'][0:2]
if grp in self.writers:
writer = self.writers[grp]
else:
writer = self.writers[grp] = self._get_writer('part_%s.xml' % grp)
next(writer) # run up to `yield` and wait
writer.send(self.elem) # write out current `<measure>`
self.elem = None
def _add_text(self):
if self.elem is not None and self.text:
if self.open:
self.elem.text = ''.join(self.text)
else:
self.elem.tail = ''.join(self.text)
self.text = []
def startElement(self, name, attrib):
if self.stack or name == 'measure':
self._add_text()
self.open = True
self.elem = etree.Element(name, attrib)
self.stack.append(self.elem)
if len(self.stack) > 1:
self.stack[-2].append(self.elem)
def characters(self, content):
if self.elem is not None:
self.text.append(content)
def endElement(self, name):
if self.stack:
self._add_text()
self.open = False
self.elem = self.stack.pop()
if not self.stack:
self._write()
def endDocument(self):
# clean up
for writer in self.writers:
self.writers[writer].close()
def main():
sax_parser = make_parser()
sax_parser.setContentHandler(ParseMeasures())
sax_parser.parse(r'test.xml')
if __name__ == '__main__':
main()
这会生成 part_08.xml
<items>
<measure code="0810">
<condition sequ="001" SID="-5041162"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure code="0811">
<condition sequ="002" SID="-5041356"/>
<footnote Id="00555"/>
</measure>
</items>
和part_29.xml
<items>
<measure code="2915">
<component Id="01" national="0"/>
<certif SID="-737740"/>
<certif SID="-737780"/>
</measure>
</items>
和part_63.xml
<items>
<measure code="6304">
<component Id="01" national="1"/>
<footnote Id="00001"/>
</measure>
</items>
下面(输出是 3 个名称为 'part_zz.xml' 的文件)
请注意,下面的解决方案不使用任何外部库。
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from xml.sax import ContentHandler
from xml.sax import make_parser
class ParseMeasures(ContentHandler):
def __init__(self):
self.data = defaultdict(list)
self.code = None
def startElement(self, name, attrs):
if name == 'measure':
self.code = attrs.get('code')[:2]
if self.code:
self.data[self.code].append((name, attrs._attrs))
def endDocument(self):
for k, v in self.data.items():
root = None
for entry in v:
if entry[0] == 'measure':
if not root:
root = ET.Element('items')
measure = ET.SubElement(root, 'measure')
temp = ET.SubElement(measure, entry[0])
temp.attrib = entry[1]
tree = ET.ElementTree(root)
tree.write('part_{}.xml'.format(k), method='xml')
def main(args):
handler = ParseMeasures()
sax_parser = make_parser()
sax_parser.setContentHandler(handler)
sax_parser.parse('my_large_xml.file.xml')
if __name__ == '__main__':
main(sys.argv[1:])
输出示例('part_08.xml')
<items>
<measure>
<measure code="0810"/>
<condition SID="-5041162" sequ="001"/>
<footnote Id="00550"/>
<footnote Id="00735"/>
</measure>
<measure>
<measure code="0811"/>
<condition SID="-5041356" sequ="002"/>
<footnote Id="00555"/>
</measure>
</items>