如何把iptables log变成json?
How to turn iptables log into json?
我有一些 rsyslog iptables 日志,每一行都是这样的:
Jul 2 06:24:39 mizar kernel: [1746506.948083] NETFILTER: IN=ens192 OUT= MAC=00:0c:29:7d:e8:00:a4:91:b1:e2:a3:34:08:00 SRC=89.248.165.203 DST=192.168.255.2 LEN=44 TOS=0x00 PREC=0x00 TTL=249 ID=9739 PROTO=TCP SPT=44587 DPT=12005 WINDOW=1024 RES=0x00 SYN URGP=0
是否有 bash / python / 将其转换为平面 json 文件的任何解决方案?必须显示每个标签,例如
{ "host": [
{ "name": "mizar",
"timestamp:" "Jul 2 06:24:39",
"in" : "ens192"
"out": "",
"src": "89.248.165.203",
"dst": "192.168.255.2",
... omissis...
}
}
每个字段都必须转换。
谢谢!
进一步参考:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 18 11:12:29 2021
@author: mf18626
"""
import json
import ipaddress
import urllib.request
import time
def parse(logLine,ipNations):
GEO_IP_API_URL="http://ip-api.com/json/"
keywords = ["in=","out=","src=","dst=","proto=","spt=","dpt=","ack","syn","psh","fin","rst"]
tokenizedLogLine = logLine.split()
# Inizializzo i campi, che saranno comuni a tutti i protocolli; poi qualcuno li popola, qualcuno no
fields={"timestamp":"",
"eventId":"",
"inIface":"",
"outIface":"",
"srcIP":"",
"dstIP":"",
"spt":"",
"dpt":"",
"proto":"",
"syn":"",
"ack":"",
"psh":"",
"fin":"",
"rst":"",
"srcLocation":"",
"dstLocation":""
}
# Il timestamp è sempre al primo posto
fields.update({"timestamp":tokenizedLogLine[0]})
# Utilizzo sempre il quarto campo come identificatore per poi ordinare su elastic
fields.update({"eventId":tokenizedLogLine[3]})
for token in tokenizedLogLine:
for keyword in keywords:
for token in tokenizedLogLine:
if keyword.lower() in token.lower():
if keyword == "in=":
fields.update({"inIface":token.lower().split("=")[1]})
elif keyword == "out=":
fields.update({"outIface":token.lower().split("=")[1]})
elif keyword =="src=":
fields.update({"srcIP":token.lower().split("=")[1]})
isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
if isIpPrivate is False:
if token.lower().split("=")[1] not in ipNations:
now = int(time.time())
if len(ipNations["queryCounter"])>=44:
while (now - ipNations["queryCounter"][0] < 60):
tts = now - ipNations["queryCounter"][0]
print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
time.sleep(60-tts)
ipNations["queryCounter"]=list(filter(lambda a: a <= now-60, ipNations["queryCounter"]))
break
now = int(time.time())
ipNations["queryCounter"].append(int(time.time()))
print("Requesting %s" %token.lower().split("=")[1])
ipNations["queryCounter"].append("A")
print(ipNations["queryCounter"])
req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
response = urllib.request.urlopen(req).read()
json_response = json.loads(response.decode('utf-8'))
try:
fields.update({"srcLocation":json_response['country']})
ipNations[token.lower().split("=")[1]]= json_response['country']
except:
pass
else:
fields.update({"srcLocation":ipNations[token.lower().split("=")[1]]})
else:
fields.update({"srcLocation":"IANA PrivNet"})
elif keyword =="dst=":
fields.update({"dstIP":token.lower().split("=")[1]})
isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
if isIpPrivate is False:
if token.lower().split("=")[1] not in ipNations:
now = int(time.time())
if len(ipNations["queryCounter"])>=44:
while (now - ipNations["queryCounter"][0] < 60):
tts = now - ipNations["queryCounter"][0]
print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
time.sleep(60-tts)
ipNations["queryCounter"]=list(filter(lambda a: a <= now-60, ipNations["queryCounter"]))
break
now = int(time.time())
ipNations["queryCounter"].append(int(time.time()))
print("Requesting %s" %token.lower().split("=")[1])
req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
response = urllib.request.urlopen(req).read()
json_response = json.loads(response.decode('utf-8'))
try:
fields.update({"dstLocation":json_response['country']})
ipNations[token.lower().split("=")[1]]= json_response['country']
except:
pass
else:
fields.update({"dstLocation":ipNations[token.lower().split("=")[1]]})
else:
fields.update({"dstLocation":"IANA PrivNet"})
elif keyword =="proto=":
fields.update({"proto":token.lower().split("=")[1]})
elif keyword =="spt=":
fields.update({"spt":token.lower().split("=")[1]})
elif keyword =="dpt=":
fields.update({"dpt":token.lower().split("=")[1]})
elif keyword =="syn":
fields.update({"syn":token.lower()})
elif keyword =="ack":
fields.update({"ack":token.lower()})
elif keyword =="psh":
fields.update({"psh":token.lower()})
elif keyword =="fin":
fields.update({"fin":token.lower()})
elif keyword =="rst":
fields.update({"rst":token.lower()})
return fields
def readlines_then_tail(fin):
while True:
line = fin.readline()
if line:
yield line
else:
tail(fin)
def tail(fin):
while True:
where = fin.tell()
line = fin.readline()
if not line:
time.sleep(1)
fin.seek(where)
else:
yield line
ipNations = {"queryCounter":[]}
i = 0
with open("logline.log") as logFile:
for logLine in readlines_then_tail(logFile):
with open("input/jsonizedlog-"+str(i)+".json","w") as jsonfile:
json.dump(parse(logLine,ipNations),jsonfile)
i += 1
我有一些 rsyslog iptables 日志,每一行都是这样的:
Jul 2 06:24:39 mizar kernel: [1746506.948083] NETFILTER: IN=ens192 OUT= MAC=00:0c:29:7d:e8:00:a4:91:b1:e2:a3:34:08:00 SRC=89.248.165.203 DST=192.168.255.2 LEN=44 TOS=0x00 PREC=0x00 TTL=249 ID=9739 PROTO=TCP SPT=44587 DPT=12005 WINDOW=1024 RES=0x00 SYN URGP=0
是否有 bash / python / 将其转换为平面 json 文件的任何解决方案?必须显示每个标签,例如
{ "host": [
{ "name": "mizar",
"timestamp:" "Jul 2 06:24:39",
"in" : "ens192"
"out": "",
"src": "89.248.165.203",
"dst": "192.168.255.2",
... omissis...
}
}
每个字段都必须转换。
谢谢!
进一步参考:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 18 11:12:29 2021
@author: mf18626
"""
import json
import ipaddress
import urllib.request
import time
def parse(logLine,ipNations):
GEO_IP_API_URL="http://ip-api.com/json/"
keywords = ["in=","out=","src=","dst=","proto=","spt=","dpt=","ack","syn","psh","fin","rst"]
tokenizedLogLine = logLine.split()
# Inizializzo i campi, che saranno comuni a tutti i protocolli; poi qualcuno li popola, qualcuno no
fields={"timestamp":"",
"eventId":"",
"inIface":"",
"outIface":"",
"srcIP":"",
"dstIP":"",
"spt":"",
"dpt":"",
"proto":"",
"syn":"",
"ack":"",
"psh":"",
"fin":"",
"rst":"",
"srcLocation":"",
"dstLocation":""
}
# Il timestamp è sempre al primo posto
fields.update({"timestamp":tokenizedLogLine[0]})
# Utilizzo sempre il quarto campo come identificatore per poi ordinare su elastic
fields.update({"eventId":tokenizedLogLine[3]})
for token in tokenizedLogLine:
for keyword in keywords:
for token in tokenizedLogLine:
if keyword.lower() in token.lower():
if keyword == "in=":
fields.update({"inIface":token.lower().split("=")[1]})
elif keyword == "out=":
fields.update({"outIface":token.lower().split("=")[1]})
elif keyword =="src=":
fields.update({"srcIP":token.lower().split("=")[1]})
isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
if isIpPrivate is False:
if token.lower().split("=")[1] not in ipNations:
now = int(time.time())
if len(ipNations["queryCounter"])>=44:
while (now - ipNations["queryCounter"][0] < 60):
tts = now - ipNations["queryCounter"][0]
print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
time.sleep(60-tts)
ipNations["queryCounter"]=list(filter(lambda a: a <= now-60, ipNations["queryCounter"]))
break
now = int(time.time())
ipNations["queryCounter"].append(int(time.time()))
print("Requesting %s" %token.lower().split("=")[1])
ipNations["queryCounter"].append("A")
print(ipNations["queryCounter"])
req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
response = urllib.request.urlopen(req).read()
json_response = json.loads(response.decode('utf-8'))
try:
fields.update({"srcLocation":json_response['country']})
ipNations[token.lower().split("=")[1]]= json_response['country']
except:
pass
else:
fields.update({"srcLocation":ipNations[token.lower().split("=")[1]]})
else:
fields.update({"srcLocation":"IANA PrivNet"})
elif keyword =="dst=":
fields.update({"dstIP":token.lower().split("=")[1]})
isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
if isIpPrivate is False:
if token.lower().split("=")[1] not in ipNations:
now = int(time.time())
if len(ipNations["queryCounter"])>=44:
while (now - ipNations["queryCounter"][0] < 60):
tts = now - ipNations["queryCounter"][0]
print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
time.sleep(60-tts)
ipNations["queryCounter"]=list(filter(lambda a: a <= now-60, ipNations["queryCounter"]))
break
now = int(time.time())
ipNations["queryCounter"].append(int(time.time()))
print("Requesting %s" %token.lower().split("=")[1])
req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
response = urllib.request.urlopen(req).read()
json_response = json.loads(response.decode('utf-8'))
try:
fields.update({"dstLocation":json_response['country']})
ipNations[token.lower().split("=")[1]]= json_response['country']
except:
pass
else:
fields.update({"dstLocation":ipNations[token.lower().split("=")[1]]})
else:
fields.update({"dstLocation":"IANA PrivNet"})
elif keyword =="proto=":
fields.update({"proto":token.lower().split("=")[1]})
elif keyword =="spt=":
fields.update({"spt":token.lower().split("=")[1]})
elif keyword =="dpt=":
fields.update({"dpt":token.lower().split("=")[1]})
elif keyword =="syn":
fields.update({"syn":token.lower()})
elif keyword =="ack":
fields.update({"ack":token.lower()})
elif keyword =="psh":
fields.update({"psh":token.lower()})
elif keyword =="fin":
fields.update({"fin":token.lower()})
elif keyword =="rst":
fields.update({"rst":token.lower()})
return fields
def readlines_then_tail(fin):
while True:
line = fin.readline()
if line:
yield line
else:
tail(fin)
def tail(fin):
while True:
where = fin.tell()
line = fin.readline()
if not line:
time.sleep(1)
fin.seek(where)
else:
yield line
ipNations = {"queryCounter":[]}
i = 0
with open("logline.log") as logFile:
for logLine in readlines_then_tail(logFile):
with open("input/jsonizedlog-"+str(i)+".json","w") as jsonfile:
json.dump(parse(logLine,ipNations),jsonfile)
i += 1