解析日志文件并找到合适的分隔符

parsing log files and finding appropriate delimiters

假设我有一个如下所示的日志文件:

'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: BEGIN;
'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: SET datestyle TO ISO;
'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: SET TRANSACTION READ ONLY;
'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: SET STATEMENT_TIMEOUT TO 300000;
'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: /* hash: f71f47211eca32d63469fba576bbbb19 */

SELECT TRIM(application_name) AS application_name
     , MAX(recordtime)        AS last_used
FROM stl_connection_log
WHERE dbname <> 'dev'
  AND username <> 'rdsdb'
  AND (   application_name LIKE 'RedshiftUserLastLogin-v%'
       OR application_name LIKE 'RedshiftSystemTablePersistence-v%'
       OR application_name LIKE 'AnalyzeVacuumUtility-v%'
       OR application_name LIKE 'ColumnEncodingUtility-v%' )
GROUP BY application_name
LIMIT 50
;
'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: SELECT btrim( pg_catalog.stll_connection_log.application_name ) AS application_name, MAX(pg_catalog.stll_connection_log.recordtime) AS last_used FROM pg_catalog.stll_connection_log WHERE pg_catalog.stll_connection_log.dbname <> 'dev'::Char(3) AND pg_catalog.stll_connection_log.username <> 'rdsdb'::Char(5) AND (pg_catalog.stll_connection_log.application_name LIKE 'AnalyzeVacuumUtility-v%' OR pg_catalog.stll_connection_log.application_name LIKE 'ColumnEncodingUtility-v%' OR pg_catalog.stll_connection_log.application_name LIKE 'RedshiftSystemTablePersistence-v%' OR pg_cata

我想读入以引号+时间戳开头的每一“行”。每行都以此开头:'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]'(我们称其为行定界符),然后将每一行分成相应的列(查询、pid、用户、数据库等)。如何以最简单的方式执行此操作?

问题是行分隔符没有出现在每个换行符上。如您所见,有一个“行”,其中查询在多个换行符上,因此在读取 python 中的文本文件时,我担心会有几行没有分隔符。那么这是否意味着当我从 python 中的文件中读取行时,我需要首先检查它是否以行定界符开头,如果不是,则继续将行附加到内存直到达到行定界符?

理想情况下,行:

'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: /* hash: f71f47211eca32d63469fba576bbbb19 */

SELECT TRIM(application_name) AS application_name
     , MAX(recordtime)        AS last_used
FROM stl_connection_log
WHERE dbname <> 'dev'
  AND username <> 'rdsdb'
  AND (   application_name LIKE 'RedshiftUserLastLogin-v%'
       OR application_name LIKE 'RedshiftSystemTablePersistence-v%'
       OR application_name LIKE 'AnalyzeVacuumUtility-v%'
       OR application_name LIKE 'ColumnEncodingUtility-v%' )
GROUP BY application_name
LIMIT 50
;

被分解成一个 csv 行,其中:

timestamp = 2021-05-18T14:01:13Z UTC
db = dev
user = rdsdb
pid = 11593
userid = 1
xid = 19771457
query = `SELECT TRIM(application_name) AS application_name, MAX(recordtime AS last_used FROM stl_connection_log WHERE dbname <> 'dev' AND username <> 'rdsdb' AND (application_name LIKE 'RedshiftUserLastLogin-v%' OR application_name LIKE 'RedshiftSystemTablePersistence-v% OR application_name LIKE 'AnalyzeVacuumUtility-v%' OR application_name LIKE 'ColumnEncodingUtility-v%' ) GROUP BY application_name LIMIT 50';

和行:

'2021-05-18T14:01:13Z UTC [ db=dev user=rdsdb pid=11593 userid=1 xid=19771457 ]' LOG: BEGIN;

分为:

timestamp = 2021-05-18T14:01:13Z UTC
db = dev
user = rdsdb
pid = 11593
userid = 1
xid = 19771457
query = `LOG: BEGIN';

您可以使用正则表达式来识别作为行分隔符的行,然后使用 groupby() 来读取分隔符或查询行的组:

from itertools import groupby
import re
import csv

row_delim = re.compile(r"'(.*? UTC) \[ db=(.*?) user=(.*?) pid=(.*?) userid=(.*?) xid=(.*?) \]'")

with open('logfile.txt') as f_input, open('output.csv', 'w', newline='') as f_output:
    csv_output = csv.writer(f_output)
    csv_output.writerow(["timestamp", "db", "user", "pid", "userid", "xid", "query"])

    for re_row_delim, lines in groupby(f_input, lambda x: row_delim.match(x)):
        if re_row_delim:
            last_delim = re_row_delim.groups()
        else:
            query = [line.strip() for line in lines if line.strip()]
            row = [*last_delim, ' '.join(query)]
            csv_output.writerow(row)

这假设参数的顺序是不变的。

对于你给定的测试文件,groupby首先会return一个有效的re_row_delim保存正则表达式的结果(在lambda函数中),lines是所有的列表分隔符行。由于它们都是一样的,我们只使用键值本身(即 re_row_delim)并忽略 lines.

下一次迭代它读取所有不匹配的行,即查询字符串。在这种情况下,re_row_delim 将是 None,因为它无法匹配,lines 现在包含所有查询行。

因此对于每次迭代,它会交替读取所有定界符行或所有查询行。

给你一个CSV文件如下:

timestamp,db,user,pid,userid,xid,query
2021-05-18T14:01:13Z UTC,dev,rdsdb,11593,1,19771457,"SELECT TRIM(application_name) AS application_name , MAX(recordtime)        AS last_used FROM stl_connection_log WHERE dbname <> 'dev' AND username <> 'rdsdb' AND (   application_name LIKE 'RedshiftUserLastLogin-v%' OR application_name LIKE 'RedshiftSystemTablePersistence-v%' OR application_name LIKE 'AnalyzeVacuumUtility-v%' OR application_name LIKE 'ColumnEncodingUtility-v%' ) GROUP BY application_name LIMIT 50 ;"

如果您还需要保留时间戳行上 LOG: 之后的文本,则需要采用不同的方法。正则表达式可以捕获该文本,query_start 可以包含每一行的所有文本:

from itertools import groupby
import re
import csv

row_delim = re.compile(r"'(.*? UTC) \[ db=(.*?) user=(.*?) pid=(.*?) userid=(.*?) xid=(.*?) \]' LOG: (.*)")
query_start = []    # holds all text after LOG: lines

with open('logfile.txt') as f_input, open('output.csv', 'w', newline='') as f_output:
    csv_output = csv.writer(f_output)
    csv_output.writerow(["timestamp", "db", "user", "pid", "userid", "xid", "query"])

    for re_row_delim, lines in groupby(f_input, lambda x: row_delim.match(x)):
        if re_row_delim:
            last_delim = re_row_delim.groups()[:6]
            query_start.extend([row_delim.match(line).group(7) for line in lines])
        else:
            query = [line.strip() for line in [*query_start, *lines] if line.strip()]
            row = [*last_delim, ' '.join(query)]
            csv_output.writerow(row)
            query_start = []