EKS 以字符串形式记录到 CloudWatch 流
EKS logs to CloudWatch stream as string
我遇到了这个问题,我有一个将日志发送到 Cloudwatch 的 EKS 集群,然后 Firehose 将日志流式传输到 s3 存储桶。
我的目标是从 s3 中获取这些日志并将它们批量转发到 elasticsearch。
我写了一个 python lambda 函数,当日志是 json 时它工作得很好。
我的问题是有些日志是字符串或 "kind of" JSON.
示例:
kube-authenticator :
time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."
kube-api 服务器:
E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted
我想知道我是否应该尝试包装这些消息并将它们转换为 JSON 或者有什么方法可以将日志格式更改为 JSON.I 考虑过编写正则表达式,但我没有没有足够的正则表达式知识。
如评论中所述,最终编写了 2 个处理日志并将其转换为 JSON 的函数。
第一个句柄 kube-apiserver,kube-controller-manager and kube-scheduler
日志组:
def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
month_and_day = message.split(' ')[0][1:]
month_and_day = insert_dash(month_and_day,2)
log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
log_time = re.findall(log_time_regex, message)[0]
currentYear = datetime.now().year
full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
log_contnet = (re.split(log_time_regex,message)[2])
message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
return message
第二个函数处理 authenticator
日志组:
def chunkwise(array, size=2):
it = iter(array)
return izip(*[it]*size)
def wrap_text_to_json_and_add_logGroup(message,logGroup):
regex = r"\".*?\"|\w+"
matches = re.findall(regex, message)
key_value_pairs = chunkwise(matches)
json_message= {}
for key_value in key_value_pairs:
key = key_value[0]
if key == 'time':
key = 'timestamp'
value = key_value[1].replace('"','')
json_message[key] = value
json_message['logGroup'] = logGroup
log_to_insert = json.dumps(json_message)
return log_to_insert
我希望这些功能对那些可能需要将日志从 cloudwatch 插入到 elasticsearch 的人有用。
我遇到了这个问题,我有一个将日志发送到 Cloudwatch 的 EKS 集群,然后 Firehose 将日志流式传输到 s3 存储桶。
我的目标是从 s3 中获取这些日志并将它们批量转发到 elasticsearch。 我写了一个 python lambda 函数,当日志是 json 时它工作得很好。 我的问题是有些日志是字符串或 "kind of" JSON.
示例:
kube-authenticator :
time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."
kube-api 服务器:
E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted
我想知道我是否应该尝试包装这些消息并将它们转换为 JSON 或者有什么方法可以将日志格式更改为 JSON.I 考虑过编写正则表达式,但我没有没有足够的正则表达式知识。
如评论中所述,最终编写了 2 个处理日志并将其转换为 JSON 的函数。
第一个句柄 kube-apiserver,kube-controller-manager and kube-scheduler
日志组:
def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
month_and_day = message.split(' ')[0][1:]
month_and_day = insert_dash(month_and_day,2)
log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
log_time = re.findall(log_time_regex, message)[0]
currentYear = datetime.now().year
full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
log_contnet = (re.split(log_time_regex,message)[2])
message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
return message
第二个函数处理 authenticator
日志组:
def chunkwise(array, size=2):
it = iter(array)
return izip(*[it]*size)
def wrap_text_to_json_and_add_logGroup(message,logGroup):
regex = r"\".*?\"|\w+"
matches = re.findall(regex, message)
key_value_pairs = chunkwise(matches)
json_message= {}
for key_value in key_value_pairs:
key = key_value[0]
if key == 'time':
key = 'timestamp'
value = key_value[1].replace('"','')
json_message[key] = value
json_message['logGroup'] = logGroup
log_to_insert = json.dumps(json_message)
return log_to_insert
我希望这些功能对那些可能需要将日志从 cloudwatch 插入到 elasticsearch 的人有用。