如何使用 pyparsing 根据行前缀将行列表解析为单个组
How to parse a list of lines into a single group based on the line prefix using pyparsing
我正在尝试解析命令的输出
ip netns exec vpn_ns ipsec stroke statusall
(下面粘贴的示例)。
该命令为每个服务 (oof-#n-#i) 终止符 (#n) 和使用该终止符 (#i) 的实例提供多行,因此
oof-2-1 is terminator server oof-2 instance 1.
如何声明一个匹配项,该匹配项收集所有以相同 ID 为前缀的行?
从这个例子我试图得到像这样的字典:
results = {
'connections':
{
'oof-1-1': [ 3 lines starting with oof-1-1 in section "Connections" ],
'oof-1-2': [ 3 lines starting with oof-1-2 in section "Connections" ]
'oof-2-1': [ 3 lines starting with oof-2-1 in section "Connections" ]
},
'sec_assocs':
{
'oof-1-1': [ 3 lines starting with oof-1-1 in section "Security Associations" ],
'oof-1-2': [ 3 lines starting with oof-1-2 in section "Security Associations" ]
'oof-2-1': [ 3 lines starting with oof-2-1 in section "Security Associations" ]
}
}
其中每个 ID 都包含一个以它开头的行的列表。
这是 StrongSwan 命令的完整输出。
sample = """
Status of IKE charon daemon (strongSwan 5.9.1, Linux 4.15.0-162-generic, x86_64):
uptime: 25 hours, since Mar 23 15:23:53 2022
worker threads: 11 of 16 idle, 5/0/0/0 working, job queue: 0/0/0/0, scheduled: 10
loaded plugins: charon aesni
Listening IP addresses:
169.254.123.2
192.168.51.254
Connections:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
Security Associations:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
"""
这是解析解决方案中使用的示例:
sample = """
Connections:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
Security Associations:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
"""
Post-processing 是处理已解析数据的最直接方式。这是您要解析的结构的 BNF:
group ::= label ':' line...
label ::= word...
line ::= prefix ':' rest_of_line
prefix ::= word '-' int '-' int
其中 word 和 int 只是字母或数字的单词,'...' 表示重复。
这转化为 pyparsing 为:
import pyparsing as pp
COLON = pp.Suppress(":")
label = pp.Combine(
pp.Word(pp.alphas)[1, ...], adjacent=False, joinString=" "
)
prefix = pp.Combine(
pp.Word(pp.alphas) + "-" + pp.Word(pp.nums) + "-" + pp.Word(pp.nums)
)
post_prefix = COLON + pp.restOfLine
line = pp.Group(prefix("prefix") + post_prefix)
lines = pp.Group(line[...])
group = pp.Group(label("group_label") + COLON + lines("subgroups"))
Pyparsing 将为您生成此铁路图:
这会解析您的文本,但要按前缀重新组合行,我们可以添加一个使用 itertools.groupby
:
的解析操作
def regroup_lines(t):
from itertools import groupby
from operator import itemgetter
ret = pp.ParseResults([])
parsed_lines = t[0]
for prefix, subgroup in groupby(parsed_lines, key=itemgetter("prefix")):
# each line in subgroup has the prefix and the rest of the line after the ':'
# repackage the multiple lines into a single group that is labeled with
# the common prefix, and contains the line contents
ret.append(pp.ParseResults.from_dict(
{
'prefix': prefix,
'lines': [line[1] for line in subgroup],
}
))
return ret
lines.add_parse_action(regroup_lines)
通过使用解析操作,重组是在解析时完成的,因此不需要额外的 post-parsing 处理。
现在我们可以解析您的示例并获得重新分组的结果:
results = group[...].parseString(sample)
这是一个打印已解析组的简短函数:
def print_groups(parsed):
for group in parsed:
print(group.group_label)
for subgroup in group.subgroups:
print(f"- {subgroup.prefix}")
for line in subgroup.lines:
print(f" {line!r}")
print()
print_groups(results)
给出:
Connections
- oof-1-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-1-2
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-2-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd'
Security Associations
- oof-1-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-1-2
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-2-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd'
这是工作示例的完整源代码:
import pyparsing as pp
COLON = pp.Suppress(":")
label = pp.Combine(pp.Word(pp.alphas)[1, ...], adjacent=False, joinString=" ")
label.setName("label")
prefix = pp.Combine(pp.Word(pp.alphas) + "-" + pp.Word(pp.nums) + "-" + pp.Word(pp.nums))
prefix.setName("prefix")
post_prefix = COLON + pp.restOfLine
line = pp.Group(prefix("prefix") + post_prefix)
lines = pp.Group(line[...])
def regroup_lines(t):
from itertools import groupby
from operator import itemgetter
ret = pp.ParseResults([])
for prefix, subgroup in groupby(t[0], key=itemgetter("prefix")):
ret.append(pp.ParseResults.from_dict(
{
'prefix': prefix,
'lines': [line[1] for line in subgroup],
}
))
return ret
lines.add_parse_action(regroup_lines)
group = pp.Group(label("group_label") + COLON + lines("subgroups"))
pp.autoname_elements()
group.create_diagram("groupby_1.html", show_results_names=True)
results = group[...].parseString(sample)
def print_groups(parsed):
for group in parsed:
print(group.group_label)
for subgroup in group.subgroups:
print(f"- {subgroup.prefix}")
for line in subgroup.lines:
print(f" {line!r}")
print()
print_groups(results)
我正在尝试解析命令的输出
ip netns exec vpn_ns ipsec stroke statusall
(下面粘贴的示例)。
该命令为每个服务 (oof-#n-#i) 终止符 (#n) 和使用该终止符 (#i) 的实例提供多行,因此
oof-2-1 is terminator server oof-2 instance 1.
如何声明一个匹配项,该匹配项收集所有以相同 ID 为前缀的行?
从这个例子我试图得到像这样的字典:
results = {
'connections':
{
'oof-1-1': [ 3 lines starting with oof-1-1 in section "Connections" ],
'oof-1-2': [ 3 lines starting with oof-1-2 in section "Connections" ]
'oof-2-1': [ 3 lines starting with oof-2-1 in section "Connections" ]
},
'sec_assocs':
{
'oof-1-1': [ 3 lines starting with oof-1-1 in section "Security Associations" ],
'oof-1-2': [ 3 lines starting with oof-1-2 in section "Security Associations" ]
'oof-2-1': [ 3 lines starting with oof-2-1 in section "Security Associations" ]
}
}
其中每个 ID 都包含一个以它开头的行的列表。
这是 StrongSwan 命令的完整输出。
sample = """
Status of IKE charon daemon (strongSwan 5.9.1, Linux 4.15.0-162-generic, x86_64):
uptime: 25 hours, since Mar 23 15:23:53 2022
worker threads: 11 of 16 idle, 5/0/0/0 working, job queue: 0/0/0/0, scheduled: 10
loaded plugins: charon aesni
Listening IP addresses:
169.254.123.2
192.168.51.254
Connections:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
Security Associations:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
"""
这是解析解决方案中使用的示例:
sample = """
Connections:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
Security Associations:
oof-1-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-1: remote: [server] uses public key authentication
oof-1-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-1-2: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-1-2: remote: [server] uses public key authentication
oof-1-2: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart
oof-2-1: %any...10.1.0.242 IKEv2, dpddelay=30s
oof-2-1: remote: [server] uses public key authentication
oof-2-1: child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd
"""
Post-processing 是处理已解析数据的最直接方式。这是您要解析的结构的 BNF:
group ::= label ':' line...
label ::= word...
line ::= prefix ':' rest_of_line
prefix ::= word '-' int '-' int
其中 word 和 int 只是字母或数字的单词,'...' 表示重复。
这转化为 pyparsing 为:
import pyparsing as pp
COLON = pp.Suppress(":")
label = pp.Combine(
pp.Word(pp.alphas)[1, ...], adjacent=False, joinString=" "
)
prefix = pp.Combine(
pp.Word(pp.alphas) + "-" + pp.Word(pp.nums) + "-" + pp.Word(pp.nums)
)
post_prefix = COLON + pp.restOfLine
line = pp.Group(prefix("prefix") + post_prefix)
lines = pp.Group(line[...])
group = pp.Group(label("group_label") + COLON + lines("subgroups"))
Pyparsing 将为您生成此铁路图:
这会解析您的文本,但要按前缀重新组合行,我们可以添加一个使用 itertools.groupby
:
def regroup_lines(t):
from itertools import groupby
from operator import itemgetter
ret = pp.ParseResults([])
parsed_lines = t[0]
for prefix, subgroup in groupby(parsed_lines, key=itemgetter("prefix")):
# each line in subgroup has the prefix and the rest of the line after the ':'
# repackage the multiple lines into a single group that is labeled with
# the common prefix, and contains the line contents
ret.append(pp.ParseResults.from_dict(
{
'prefix': prefix,
'lines': [line[1] for line in subgroup],
}
))
return ret
lines.add_parse_action(regroup_lines)
通过使用解析操作,重组是在解析时完成的,因此不需要额外的 post-parsing 处理。
现在我们可以解析您的示例并获得重新分组的结果:
results = group[...].parseString(sample)
这是一个打印已解析组的简短函数:
def print_groups(parsed):
for group in parsed:
print(group.group_label)
for subgroup in group.subgroups:
print(f"- {subgroup.prefix}")
for line in subgroup.lines:
print(f" {line!r}")
print()
print_groups(results)
给出:
Connections
- oof-1-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-1-2
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-2-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd'
Security Associations
- oof-1-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-1-2
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restart'
- oof-2-1
' %any...10.1.0.242 IKEv2, dpddelay=30s'
' remote: [server] uses public key authentication'
' child: dynamic === 0.0.0.0/0 TUNNEL, dpdaction=restartd'
这是工作示例的完整源代码:
import pyparsing as pp
COLON = pp.Suppress(":")
label = pp.Combine(pp.Word(pp.alphas)[1, ...], adjacent=False, joinString=" ")
label.setName("label")
prefix = pp.Combine(pp.Word(pp.alphas) + "-" + pp.Word(pp.nums) + "-" + pp.Word(pp.nums))
prefix.setName("prefix")
post_prefix = COLON + pp.restOfLine
line = pp.Group(prefix("prefix") + post_prefix)
lines = pp.Group(line[...])
def regroup_lines(t):
from itertools import groupby
from operator import itemgetter
ret = pp.ParseResults([])
for prefix, subgroup in groupby(t[0], key=itemgetter("prefix")):
ret.append(pp.ParseResults.from_dict(
{
'prefix': prefix,
'lines': [line[1] for line in subgroup],
}
))
return ret
lines.add_parse_action(regroup_lines)
group = pp.Group(label("group_label") + COLON + lines("subgroups"))
pp.autoname_elements()
group.create_diagram("groupby_1.html", show_results_names=True)
results = group[...].parseString(sample)
def print_groups(parsed):
for group in parsed:
print(group.group_label)
for subgroup in group.subgroups:
print(f"- {subgroup.prefix}")
for line in subgroup.lines:
print(f" {line!r}")
print()
print_groups(results)