从文本文件中读取数据,在特定时间段写入 Python
Reading data from a text file, written in a specific period of time in Python
为了详细解释,我有一个文本文件,我在其中记录了来自不同数量的流程实例的一些数据(即可能有 4 到 16 个流程实例生成日志)。
所有实例按以下格式写入一个文件:
2018-09-07 11:34:47,251 - AppLog - INFO -
******************************************
Log Report - Consume Cycle jhTyjs-ConsumeCycle
******************************************
Uptime: 144708.62724542618s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
2018-09-07 11:37:47,439 - AppLog - INFO -
******************************************
Log Report - Consume Cycle aftTys-ConsumeCycle
******************************************
Uptime: 144888.81490063667s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
This is an error line which could be generated by any of the instances and can be anything, <br> like qfuigeececevwovw or wefebew efeofweffhw v wihv or any python \n exception or aiosfgd ceqic eceewfi
2018-09-07 11:40:47,615 - AppLog - INFO -
******************************************
Log Report - Consume Cycle hdyGid-ConsumeCycle
******************************************
Uptime: 145068.99103808403s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
(在每条日志的Log Report - Consume Cycle [placeholder]-ConsumeCycle
中,[placeholder]
是随机的)
因此,我的文件由大量上述格式的日志组成,一个接一个。每个实例每 3 分钟生成一次日志。
(即所有实例在 3 分钟内只生成一个日志)
如果任何实例出现错误,它们也会将其记录在同一个文件中。所以数据结构根本不一致。
现在,我必须从所有实例中获取最后记录的数据,即最后 3 分钟,并对它们执行一些任务。
有什么方法可以将最近 3 分钟的数据写入日志文件(是错误还是上述格式的完美日志)?
[EDIT] 在日志之间添加了错误行
您可以在
进行拆分
******************************************\n\n
和
record_list = file_Obj.read().split("******************************************\n\n")
这将为您提供列表中的每条独立记录。您可能需要转义反斜杠。您可以通过切片来获取列表的最后一个元素。
print(record_list[-1])
既然你说过文件不会太大而无法处理你不需要任何花哨的东西来筛选它(即从后面缓冲读取) - 你可以遍历整个文件,收集个人日志条目并丢弃超过 3 分钟前发生的条目。
这特别容易,因为您的条目在开始时的日期时间明显不同,而且您的日志日期格式是 ISO-8601 格式,因此您甚至不需要解析日期 - 您可以使用直接字典序比较。
因此,一种方法是:
import datetime
# if your datetime is in UTC use datetime.datetime.utcnow() instead
threshold = datetime.datetime.now() - datetime.timedelta(minutes=3) # 3m ago
# turn it into a ISO-8601 string
threshold_cmp = threshold.strftime("%Y-%m-%d %H:%M:%S") # we'll ignore the milliseconds
entries = []
with open("path/to/your.log") as f: # open your log for reading
current_date = ""
current_entry = ""
for line in f: # iterate over it line-by-line
if line[0].isdigit(): # beginning of a (new) log entry
# store the previous entry if newer than 3 minutes
if current_date >= threshold_cmp: # store the previous entry if newer than 3m
entries.append(current_entry)
current_date = line[:19] # store the date of this (new) entry
current_entry = "" # (re)initialize the entry
current_entry += line # add the current line to the cached entry
if current_entry and current_date >= threshold_cmp: # store the leftovers, if any
entries.append(current_entry)
# now the list 'entries' contains individual entries that occurred in the past 3 minutes
print("".join(entries)) # print them out, or do whatever you want with them
您可以通过区分占位符使这更容易,但您已经说过它是随机的,因此您必须依赖日期时间。
为了详细解释,我有一个文本文件,我在其中记录了来自不同数量的流程实例的一些数据(即可能有 4 到 16 个流程实例生成日志)。
所有实例按以下格式写入一个文件:
2018-09-07 11:34:47,251 - AppLog - INFO -
******************************************
Log Report - Consume Cycle jhTyjs-ConsumeCycle
******************************************
Uptime: 144708.62724542618s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
2018-09-07 11:37:47,439 - AppLog - INFO -
******************************************
Log Report - Consume Cycle aftTys-ConsumeCycle
******************************************
Uptime: 144888.81490063667s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
This is an error line which could be generated by any of the instances and can be anything, <br> like qfuigeececevwovw or wefebew efeofweffhw v wihv or any python \n exception or aiosfgd ceqic eceewfi
2018-09-07 11:40:47,615 - AppLog - INFO -
******************************************
Log Report - Consume Cycle hdyGid-ConsumeCycle
******************************************
Uptime: 145068.99103808403s
Jobs Run: 16866
Jobs Current: 1
Q Avg Read Time: 0
Q Msgs Read: 0
Worker Load: ['1.00', '1.00', '1.00']
******************************************
(在每条日志的Log Report - Consume Cycle [placeholder]-ConsumeCycle
中,[placeholder]
是随机的)
因此,我的文件由大量上述格式的日志组成,一个接一个。每个实例每 3 分钟生成一次日志。
(即所有实例在 3 分钟内只生成一个日志)
如果任何实例出现错误,它们也会将其记录在同一个文件中。所以数据结构根本不一致。
现在,我必须从所有实例中获取最后记录的数据,即最后 3 分钟,并对它们执行一些任务。
有什么方法可以将最近 3 分钟的数据写入日志文件(是错误还是上述格式的完美日志)?
[EDIT] 在日志之间添加了错误行
您可以在
进行拆分******************************************\n\n
和
record_list = file_Obj.read().split("******************************************\n\n")
这将为您提供列表中的每条独立记录。您可能需要转义反斜杠。您可以通过切片来获取列表的最后一个元素。
print(record_list[-1])
既然你说过文件不会太大而无法处理你不需要任何花哨的东西来筛选它(即从后面缓冲读取) - 你可以遍历整个文件,收集个人日志条目并丢弃超过 3 分钟前发生的条目。
这特别容易,因为您的条目在开始时的日期时间明显不同,而且您的日志日期格式是 ISO-8601 格式,因此您甚至不需要解析日期 - 您可以使用直接字典序比较。
因此,一种方法是:
import datetime
# if your datetime is in UTC use datetime.datetime.utcnow() instead
threshold = datetime.datetime.now() - datetime.timedelta(minutes=3) # 3m ago
# turn it into a ISO-8601 string
threshold_cmp = threshold.strftime("%Y-%m-%d %H:%M:%S") # we'll ignore the milliseconds
entries = []
with open("path/to/your.log") as f: # open your log for reading
current_date = ""
current_entry = ""
for line in f: # iterate over it line-by-line
if line[0].isdigit(): # beginning of a (new) log entry
# store the previous entry if newer than 3 minutes
if current_date >= threshold_cmp: # store the previous entry if newer than 3m
entries.append(current_entry)
current_date = line[:19] # store the date of this (new) entry
current_entry = "" # (re)initialize the entry
current_entry += line # add the current line to the cached entry
if current_entry and current_date >= threshold_cmp: # store the leftovers, if any
entries.append(current_entry)
# now the list 'entries' contains individual entries that occurred in the past 3 minutes
print("".join(entries)) # print them out, or do whatever you want with them
您可以通过区分占位符使这更容易,但您已经说过它是随机的,因此您必须依赖日期时间。