正在解析用户 table 使用情况的 postgres 日志
parsing postgres logs for table usage by user
我正在对现有数据库 table 的使用量以及哪些用户作为数据库清理工作的一部分进行审计。使用日志文件似乎是获取这些数据的自然方式。我们有 pgBadger 运行 用于性能报告,但我所描述的使用情况报告不存在。有谁知道可以从日志中提取 table 和用户信息以便我可以计算其摘要统计信息的工具(pgBadger 或其他工具)?我想利用现有工具而不是滚动我自己的日志解析器。
我最终写了一个 hacky 日志解析器。
import re
import psqlparse
import json
statement_re = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} UTC:[^:]*:(?P<user>\w+)@(?P<db>\w+):.*statement:\s+(?P<stmt>.*)"
log_re = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
def parse_logs(log_file):
with open(log_file, 'r') as f:
state = 'looking'
info = None
for line in f:
if state == 'found':
if re.match(log_re, line) is None:
info['stmt'].append(line.strip())
else:
info['stmt'] = "\n".join(info['stmt']).strip()
try:
parsed_stmt = psqlparse.parse(info['stmt'])[0]
info['stmt_type'] = str(type(parsed_stmt)).split(".")[-1][0:-6].lower()
info['tables'] = list(parsed_stmt.tables())
except:
pass
print(json.dumps(info))
state = 'looking'
if state == 'looking':
m = re.match(statement_re, line)
if m is not None:
stmt = m.group('stmt')
if stmt not in {'BEGIN', 'COMMIT', 'ROLLBACK', 'SELECT 1'} and 'show' not in stmt:
info = {'user': m.group('user'), 'stmt': [stmt]}
state = 'found'
parse_logs('postgresql.log')
我正在对现有数据库 table 的使用量以及哪些用户作为数据库清理工作的一部分进行审计。使用日志文件似乎是获取这些数据的自然方式。我们有 pgBadger 运行 用于性能报告,但我所描述的使用情况报告不存在。有谁知道可以从日志中提取 table 和用户信息以便我可以计算其摘要统计信息的工具(pgBadger 或其他工具)?我想利用现有工具而不是滚动我自己的日志解析器。
我最终写了一个 hacky 日志解析器。
import re
import psqlparse
import json
statement_re = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} UTC:[^:]*:(?P<user>\w+)@(?P<db>\w+):.*statement:\s+(?P<stmt>.*)"
log_re = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
def parse_logs(log_file):
with open(log_file, 'r') as f:
state = 'looking'
info = None
for line in f:
if state == 'found':
if re.match(log_re, line) is None:
info['stmt'].append(line.strip())
else:
info['stmt'] = "\n".join(info['stmt']).strip()
try:
parsed_stmt = psqlparse.parse(info['stmt'])[0]
info['stmt_type'] = str(type(parsed_stmt)).split(".")[-1][0:-6].lower()
info['tables'] = list(parsed_stmt.tables())
except:
pass
print(json.dumps(info))
state = 'looking'
if state == 'looking':
m = re.match(statement_re, line)
if m is not None:
stmt = m.group('stmt')
if stmt not in {'BEGIN', 'COMMIT', 'ROLLBACK', 'SELECT 1'} and 'show' not in stmt:
info = {'user': m.group('user'), 'stmt': [stmt]}
state = 'found'
parse_logs('postgresql.log')