使用 pyparsing 解析多行结构化错误日志

Parsing multiline structured error log with pyparsing

我有一个包含 100 个条目的结构化错误日志。每个条目都有一个非常具体的结构,我正在尝试对其进行解析以在 Excel.

中进行进一步分析

解析 100 个条目没问题。每个条目中的结构还不存在,因为这是我第一次使用 pyparsing,所以我对如何从这里取得进展有点迷茫。请参阅下面的工作示例。

from pyparsing.core import OneOrMore

import datetime

from collections import defaultdict

test_string = """
    ErrorLog[1].ErrorDate := D#2021-3-3;
    ErrorLog[1].ErrorTime := TOD#16:1:49.567;
    ErrorLog[1].LocationRef := 20432;
    ErrorLog[1].ErrorCode := 105;
    ErrorLog[1].FirstCheck.Pos[1].Loc := 0;
    ErrorLog[1].FirstCheck.Pos[1].MatchedPos := 0;
    ErrorLog[1].FirstCheck.Pos[2].Loc := 12003;
    ErrorLog[1].FirstCheck.Pos[2].MatchedPos := 5;
    ErrorLog[1].SecondCheck.ID[1] := '4';
    ErrorLog[1].SecondCheck.ID[2] := '9';
    ErrorLog[1].SecondCheck.ID[3] := '0';
    ErrorLog[1].SecondCheck.ID[4] := '7';
    ErrorLog[1].SecondCheck.ID[5] := '0';
    ErrorLog[1].SecondCheck.ID[6] := '1';
    ErrorLog[1].SecondCheck.ID[7] := '8';
    ErrorLog[1].SecondCheck.ID[8] := '4';
    ErrorLog[1].SecondCheck.ID[9] := '2';
    ErrorLog[1].SecondCheck.ID[10] := '4';
    ErrorLog[1].SecondCheck.ID[11] := '0';
    ErrorLog[1].SecondCheck.ID[12] := '6';
    ErrorLog[1].SecondCheck.ID[13] := '7';
    ErrorLog[1].SecondCheck.ID[14] := '7';
    ErrorLog[1].SecondCheck.ID[15] := '1';
    ErrorLog[1].SecondCheck.ID[16] := '0';
    ErrorLog[1].SecondCheck.ID[17] := '8';
    ErrorLog[1].SecondCheck.ID[18] := '3';
    ErrorLog[1].SecondCheck.PositionCount := 5;
    ErrorLog[1].SecondCheck.Pos[1].Loc := 11036;
    ErrorLog[1].SecondCheck.Pos[1].TotalQty := 1;
    ErrorLog[1].SecondCheck.Pos[1].MatchedQty := 1;
    ErrorLog[1].SecondCheck.Pos[2].Loc := 11031;
    ErrorLog[1].SecondCheck.Pos[2].TotalQty := 1;
    ErrorLog[1].SecondCheck.Pos[2].MatchedQty := 1;
"""

LBRK, RBRK, DOT, SEMI, COLON, DASH = map(Suppress, "[].;:-")

integer = Word(nums).setParseAction(lambda t:int(t[0]))
date = (Suppress("D#") + integer + DASH + integer + DASH + integer).setParseAction(lambda t:datetime.datetime(*t))
time = Suppress("TOD#") + integer + COLON + integer + COLON + integer + DOT + integer

key = Word(printables)
value = date | time | Word(printables, exclude_chars=";") 

ID = Suppress("ErrorLog") + LBRK + Word(nums) + RBRK + DOT

struct = Forward()
error_expr = Group(ID("id") + key("key") + Suppress(":=") + value("value") + SEMI)

struct << Dict(OneOrMore(error_expr))

parse_results = struct.parse_file('test.txt')
errors = defaultdict(list)

for event in parse_results:
    errors[event[0]].append(event[2])

print(errors)

这将输出以下结构

defaultdict(<class 'list'>, {'1': [datetime.datetime(2021, 3, 3, 0, 0), 16, '20432', '105', '0', '0', '12003', '5', "'4'", "'9'", "'0'", "'7'", "'0'", "'1'", "'8'", "'4'", "'2'", "'4'", "'0'", "'6'", "'7'", "'7'", "'1'", "'0'", "'8'", "'3'", 
'5', '11036', '1', '1', '11031', '1', '1']})

我正在尝试解决的问题

  1. 我想在解析日期时包含时间戳,以形成一个单一的日期时间。

  2. SecondCheck 包含一个基本上由 18 个字符组成的字符串的 ID。我想将这些解析为一个字段。

  3. 理想情况下,输出格式应该是字典列表,每个字典都包含键值对。

思考过程

看来我需要使用分号以外的东西来区分不同的字段。使用分号可以很好地处理除了应该从多行聚合的字段之外的任何事情,我想我已经了解了构造解析器元素的基本原则,但是在我的头撞了几天之后,我很高兴获取有关如何解决此问题的一些技巧或提示。

跨多行的值的组合实际上比看起来更复杂,所以我将问题分解为解析各个行,然后将解析的结果合并到所需的结构中。

您定义标点符号和值表达式的初始代码是一个好的开始。我还添加了一个用于解析引用字符串的表达式:

LBRK, RBRK, DOT, SEMI, COLON, DASH = map(Suppress, "[].;:-")

integer = Word(nums).setParseAction(lambda t: int(t[0]))
date = (Suppress("D#") + integer + DASH + integer + DASH + integer).setParseAction(lambda t: datetime.datetime(*t))
time = Suppress("TOD#") + integer + COLON + integer + COLON + integer + DOT + integer
time.setParseAction(lambda t: datetime.time(hour=t[0], minute=t[1], second=t[2], microsecond=t[3]*1000))
qs = quoted_string.add_parse_action(remove_quotes)

为您的 key 使用 Word(printables) 使键结构只是扁平字符串,但我们需要将它们解析为多个部分,同时识别某些部分何时实际上是索引列表项。

只是为了进一步分解这些步骤,这里有一个用于单行输入的准 BNF(替代项用 '|' 分隔,可选元素在 []' 中,重复显示与...):

error_expr ::= key ':=' value ';'
key ::= name[index] ['.' name[index]]...
name ::= alpha...
index ::= '[' integer ']'
value ::= integer | date | time | quoted_string | non-semi...

(我添加了一堆 setName() 调用来标记这些表达式,并使用新的 pyparsing create_diagram() 方法生成了这张铁路图。)

我们将使 key 表达式比仅匹配任何一组可打印文件更明确,但实际上会解析单独的名称和可选的 [n] 索引。我还添加了解析操作来制作名称索引部分元组,以及一个使整个键成为元组的操作:

key = delimited_list((name + Optional(index)).add_parse_action(tuple), delim=".")
key.add_parse_action(tuple)

error_expr 的东西很好。由于我们只是解析单独的行,因此 struct 不一定是 Forward,它可以是一个或多个 error_exprs。我保留了您的 Dict 结构,因为它将在解决方案的第二部分派上用场:

value = integer | date | time | qs | Word(printables, exclude_chars=";")
error_expr = Group(key("key") + Suppress(":=") + value("value") + SEMI)

struct = Dict(OneOrMore(error_expr))

使用这个解析器,我解析了你的测试字符串,并使用 pprint 打印出结果:

parse_results = struct.parse_string(test_string)

from pprint import pprint
pprint(parse_results.as_dict())

这些部分结果:

{(('ErrorLog', 1), ('ErrorCode',)): 105,
 (('ErrorLog', 1), ('ErrorDate',)): datetime.datetime(2021, 3, 3, 0, 0),
 (('ErrorLog', 1), ('ErrorTime',)): datetime.time(16, 1, 49, 567000),
 (('ErrorLog', 1), ('FirstCheck',), ('Pos', 1), ('Loc',)): 0,
 (('ErrorLog', 1), ('FirstCheck',), ('Pos', 1), ('MatchedPos',)): 0,
 (('ErrorLog', 1), ('FirstCheck',), ('Pos', 2), ('Loc',)): 12003,
 (('ErrorLog', 1), ('FirstCheck',), ('Pos', 2), ('MatchedPos',)): 5,
 (('ErrorLog', 1), ('LocationRef',)): 20432,
 ...
 

我们可以看到我们得到了一个字典,其中每个键是一个名称索引的元组,或者只是错误表达式键中每个部分的名称,值是解析后的值。所以下一步是处理这个键值列表,并将它们构建到一个结构中。

由于我们要处理一系列键并按连续的键名提取组,因此 itertools.groupby 是合乎逻辑的选择。 groupby 可以完成一系列项目,并 return 根据某些关键功能将它们分组。

这段代码非常复杂,是问题的主要部分。

def make_nested_groups(parent, idx, seq):
    from itertools import groupby

    # every item in seq is a tuple of either (name,) or (name, #)
    # to detect and merge lists, group by name
    for field_label, field_subs in groupby(seq, lambda x: x[idx][0]):
        current = []
        # get subgroups by separate element number
        for field, subfields in groupby(field_subs, key=lambda x: x[idx]):
            subs = list(subfields)
            # if indexes are given, this is a list of subitems
            if len(field) > 1:
                if not current:
                    parent.append([field_label, current])

                # if we are at the last part of the key, just append
                # the value; otherwise, append a nested group
                if len(subs[0]) == idx + 1:
                    current.append(parse_results[subs[0]])
                else:
                    vals = []
                    make_nested_groups(vals, idx + 1, subs)
                    current.append(vals)

            else:
                # no index, this is just a sub-structure or a single value
                sub = subs[0]
                if len(sub) > idx+1:
                    vals = []
                    make_nested_groups(vals, idx + 1, subs)
                    parent.append([field[0], vals])
                else:
                    parent.append([field[0], parse_results[sub]])


errors = []
make_nested_groups(errors, 0, [pr[0] for pr in parse_results])

现在项目有更多的结构(如存储在 errors 中):

[['ErrorLog',
  [[['ErrorDate', datetime.datetime(2021, 3, 3, 0, 0)],
    ['ErrorTime', datetime.time(16, 1, 49, 567000)],
    ['LocationRef', 20432],
    ['ErrorCode', 105],
    ['FirstCheck',
     [['Pos',
       [[['Loc', 0], ['MatchedPos', 0]],
        [['Loc', 12003], ['MatchedPos', 5]]]]]],
    ['SecondCheck',
     [['ID',
       ['4',
        '9',
        '0',
        ...

将其转换为嵌套字典要简单得多:

def make_nested_dict(seq):
    try:
        seq_dict = dict(seq)
        return {k: make_nested_dict(v) for k, v in seq_dict.items()}
    except (ValueError, TypeError):
        if isinstance(seq, list):
            return [make_nested_dict(s) for s in seq]
        return seq

error_struct = make_nested_dict(errors)

这将 pprint() 作为:

{'ErrorLog': [{'ErrorCode': 105,
               'ErrorDate': datetime.datetime(2021, 3, 3, 0, 0),
               'ErrorTime': datetime.time(16, 1, 49, 567000),
               'FirstCheck': {'Pos': [{'Loc': 0, 'MatchedPos': 0},
                                      {'Loc': 12003, 'MatchedPos': 5}]},
               'LocationRef': 20432,
               'SecondCheck': {'ID': ['4',
                                      '9',
                                      '0',
                                      '7',
                                      '0',
                                      '1',
                                      '8',
                                      '4',
                                      '2',
                                      '4',
                                      '0',
                                      '6',
                                      '7',
                                      '7',
                                      '1',
                                      '0',
                                      '8',
                                      '3'],
                               'Pos': [{'Loc': 11036,
                                        'MatchedQty': 1,
                                        'TotalQty': 1},
                                       {'Loc': 11031,
                                        'MatchedQty': 1,
                                        'TotalQty': 1}],
                               'PositionCount': 5}}]}

我将留给您将 ID 字段合并为单个字符串,并将 ErrorDateErrorTime 合并为日期时间的最后部分。