Python 在 for 循环中使用 iter 时 lxml 内存不足

Python lxml out of memory while using iter in a for loop

我正在编写一个解决方案以从文件中提取信息。 这些文件是通过其他脚本在 Windows Event Utility 命令中生成的(我没有调用,只是接收文件进行解析):

wevtutil qe Application /q:"*[System[Provider[@Name='NameOfTheSourceApplication']]]" >> %FILE%

此命令将有关查询的源应用程序的所有输出保存到转储文件中,最终每行上的每个 事件 都有一个 XML。我只关心 EventDataTimeCreated SystemTime.

示例输出:

<?xml version="1.0" encoding="UTF-8"?>
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
   <System>
      <Provider Name="" />
      <EventID Qualifiers="">0</EventID>
      <Level>4</Level>
      <Task>0</Task>
      <Keywords />
      <TimeCreated SystemTime="2018-10-02T11:19:41.000000000Z" />
      <EventRecordID />
      <Channel>Application</Channel>
      <Computer />
      <Security />
   </System>
   <EventData>
      DATA
      <Data />
   </EventData>
</Event>

转储文件完成后,文件可能会很大(超过6-7GB)。所以我使用 Linux iconv 实用程序将源文件编码从 UTF-16/UCS2-LE(wevutil 的默认编码)更改为 UTF-8,新编码减少了几乎一半的文件尺寸。然后我将 grouper 配方与一些简单的文件拆分功能结合使用,以便将大转储文件拆分为较小的文件:

def grouper(n, iterable, fillvalue=None):
   """Collect data into fixed-length chunks or blocks"""
   # grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx
   args = [iter(iterable)] * n
   return zlg(fillvalue=fillvalue, *args)

def splitter(fileobj,outName,ranoutName,rencode,wencode):
    with open(fileobj,"r",encoding='UTF-8',errors='replace') as f:
        for i, g in enumerate(grouper(n, f, fillvalue=''), 1):
            with open('{0}_{1}.xml'.format(i,outName), 'w',encoding=wencode) as fout:
                fout.writelines(g)
                print("Splitting file : %s" % (fileobj))

因为这些文件实际上不是 XML 文件,而是格式为 xml 的每一行,带有命名空间,我将一个根标记添加到每个拆分的文件中,由 lxml 稍后(glst 代表 "globbed list")。

def rooter(glst):
    for logFiles in glst:
        oFile = open(logFiles,'r',encoding='utf-8')
        rFile = oFile.read()
        wFile = open(logFiles,'w',encoding='utf-8')
        wFile.write('<root>')
        wFile.write(rFile)
        wFile.write('</root>')
        oFile.close()
        wFile.close()

        print("Rooting XML : %s" % (logFiles))

然后我只加载一个 XML 文件以在 lxml:

中解析
def loadXml(fileobj):
    tree = etree.parse(fileobj)
    print("Processing : %s" % (fileobj))
    return tree

我的瓶颈来了,因为当我只寻找 Event Data 和我的 Event Time 时,我没有找到任何其他方便的方法来有效地解析文件。找到数据后,我将我的发现附加到两个单独的列表(一个用于事件数据,一个用于事件时间),稍后我将其转换为一个简单的 CSV 文件,以便通过 [=29= 继续我的解析].

此代码实际上适用于 2GB 以下的文件,但是 运行 在解析超过 2GB 的文件时完全内存不足,我的解决方案必须 运行 在只有 2-3GB 空闲 RAM 的系统中(Windows 64 位桌面)。

def parser(tree,DataL,DataTimeL):
    for evts in tree.iter('{%s}EventData' % nameSpace):
        EvtData = evts.find('{%s}Data' % nameSpace).text
        DataL.append(EvtData)        
    for evtSysTime in tree.iter('{%s}System' % nameSpace):
        eSysTime = evtSysTime.find('{%s}TimeCreated' % nameSpace).attrib
        DataTimeL.append(eSysTime)
        break

我尝试在解析后手动 gc.collectdel 文件对象,但它似乎没有任何效果并且 python 一直在积累内存直到 PC 崩溃。

def initParser(glst,DataL,DataTimeL):
    for file in glst:
     root = loadXml(file)
     parser(root,DataL,DataTimeL)
     gc.collect()
     del file

CSV 创建(zlg 代表 itertools - zip_longest):

with open('LogOUT.csv', 'w', encoding="UTF-8", newline='') as cF:
    wr = csv.writer(cF)
    wr.writerow(("Event", "Event Time"))
    wr.writerows(zlg(EvtL,EvtTimeL))

我尝试过使用 TinyDB、ZODB,这听起来有点矫枉过正,而且速度太慢或者我做错了。手动将事件转储到 CSV 非常慢。 由于 for 循环解析器函数实际上对 2GB 以下的文件非常有效,我想找到一种方法来安全有效地附加这些大列表而不会使整个系统崩溃。

提前致谢。

这是一个概念证明,它使用迭代器读取一个包含 self-contained XML 行的大文件,并将特定字段提取到 CSV 文件中。修改它以满足您的需要。

import csv
import itertools
import typing
from io import StringIO
from xml.etree import ElementTree
from xml.etree.ElementTree import Element


def grouper(iterable, n, fill=None) -> typing.Iterator:
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fill)


def parse_event_xml(event_xml: str) -> dict:
    root: Element = ElementTree.fromstring(event_xml)
    namespaces = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'}
    time_el = root.find('./ns:System/ns:TimeCreated', namespaces=namespaces)
    data_el = root.find('./ns:EventData', namespaces=namespaces)

    return {
        'Event Time': time_el.attrib['SystemTime'],
        'Event Data': data_el.text,
    }


def process_batch(batch: typing.Iterator[str], batch_filename: str) -> None:
    fields = ['Event Time', 'Event Data']
    with open(batch_filename, 'w', newline='') as bf:
        writer = csv.DictWriter(bf, fieldnames=fields)
        writer.writeheader()
        for item in batch:
            if not item:  # skip empty lines
                continue
            parsed = parse_event_xml(item)
            writer.writerow(parsed)


if __name__ == '__main__':
    xml_raw = '''<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name=''/><EventID Qualifiers=''>0</EventID><Level>4</Level><Task>0</Task><Keywords></Keywords><TimeCreated SystemTime='2018-10-02T11:19:41.000000000Z'/><EventRecordID></EventRecordID><Channel>Application</Channel><Computer></Computer><Security/></System><EventData>DATA<Data></Data></EventData></Event>
    <Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name=''/><EventID Qualifiers=''>0</EventID><Level>4</Level><Task>0</Task><Keywords></Keywords><TimeCreated SystemTime='2018-10-02T11:19:41.000000000Z'/><EventRecordID></EventRecordID><Channel>Application</Channel><Computer></Computer><Security/></System><EventData>DATA<Data></Data></EventData></Event>'''

    batch_size = 10  # lines / events

    # read the event stream
    # normally you'd use `with open(filename, encoding='utf-8')`
    # but here i'm reading from a string
    with StringIO(xml_raw) as f:
        for i, batch in enumerate(grouper(f, batch_size)):
            batch_filename = f'batch_{i}.csv'
            process_batch(batch, batch_filename)

调整parse_event_xml函数提取你需要的数据,这里我只用了EventTimeEventData

这个输出一个像这样的 csv 文件:

Event Time,Event Data
2018-10-02T11:19:41.000000000Z,DATA
2018-10-02T11:19:41.000000000Z,DATA