CloudWatch 日志流到 Lambda python
CloudWatch logs stream to Lambda python
我在 CloudWatch 日志组中创建了一个订阅过滤器并将其流式传输到我的 lambda 函数,但我的 lambda 函数出现错误。
代码:
import boto3
import binascii
import json
import base64
import zlib
def stream_gzip_decompress(stream):
dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header
foo=''
for chunk in stream:
rv = dec.decompress(chunk)
if rv:
foo += rv
return foo
def lambda_handler(event, context):
# Decode and decompress the AWS Log stream to extract json object
stream=json.dumps(event['awslogs']['data'])
f = base64.b64decode(stream)
payload=json.loads(stream_gzip_decompress(f.decode(f)))
print(payload)
错误:
回复:
{
"errorMessage": "decode() argument 1 must be str, not bytes",
"errorType": "TypeError",
"stackTrace": [
[
"/var/task/lambda_function.py",
34,
"lambda_handler",
"payload=json.loads(stream_gzip_decompress(f.decode(f)))"
]
]
}
任何帮助或线索将不胜感激!如果您有任何替代解决方案,请提出建议。我的要求是使用 lambda 处理来自 CloudWatch 的日志。
提前致谢!!
以下是我在处理发送到 AWS Lambda 的 CloudWatch 日志时通常遵循的大纲。
import gzip
import json
from StringIO import StringIO
def lambda_handler(event, context):
cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=StringIO(cw_data.decode('base64', 'strict'))).read()
log_events = json.loads(cw_logs)
for log_event in logevents['logEvents']:
# Process Logs
我看到您将发送到 AWS Lambda 的数据视为 JSON 对象。您首先要进行 base64 解码,然后解压缩数据。解码和解压缩后,您应该拥有包含日志信息的 JSON 对象。
如果其他人正在寻求有关此主题的帮助。
我采取了稍微不同的方法,但我确实在事件中看到了一个 'awslog' 键。
这是我成功的示例。
Python 3.6 拉姆达。
设置 cloudwatch 触发器以调用 lambda
import gzip
import json
import base64
def lambda_handler(event, context):
print(f'Logging Event: {event}')
print(f"Awslog: {event['awslogs']}")
cw_data = event['awslogs']['data']
print(f'data: {cw_data}')
print(f'type: {type(cw_data)}')
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)
log_events = payload['logEvents']
for log_event in log_events:
print(f'LogEvent: {log_event}')
这是类星体的答案转换为Python 3.
import gzip
import json
import base64
from io import BytesIO
cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=BytesIO(base64.b64decode(cw_data, validate=True))).read()
log_events = json.loads(cw_logs)
for log_event in log_events['logEvents']:
# Process Logs
主要变化是使用 io.BytesIO 和不同的 base64 解码函数来获取日志事件数据。
我在 CloudWatch 日志组中创建了一个订阅过滤器并将其流式传输到我的 lambda 函数,但我的 lambda 函数出现错误。
代码:
import boto3
import binascii
import json
import base64
import zlib
def stream_gzip_decompress(stream):
dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header
foo=''
for chunk in stream:
rv = dec.decompress(chunk)
if rv:
foo += rv
return foo
def lambda_handler(event, context):
# Decode and decompress the AWS Log stream to extract json object
stream=json.dumps(event['awslogs']['data'])
f = base64.b64decode(stream)
payload=json.loads(stream_gzip_decompress(f.decode(f)))
print(payload)
错误:
回复:
{
"errorMessage": "decode() argument 1 must be str, not bytes",
"errorType": "TypeError",
"stackTrace": [
[
"/var/task/lambda_function.py",
34,
"lambda_handler",
"payload=json.loads(stream_gzip_decompress(f.decode(f)))"
]
]
}
任何帮助或线索将不胜感激!如果您有任何替代解决方案,请提出建议。我的要求是使用 lambda 处理来自 CloudWatch 的日志。
提前致谢!!
以下是我在处理发送到 AWS Lambda 的 CloudWatch 日志时通常遵循的大纲。
import gzip
import json
from StringIO import StringIO
def lambda_handler(event, context):
cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=StringIO(cw_data.decode('base64', 'strict'))).read()
log_events = json.loads(cw_logs)
for log_event in logevents['logEvents']:
# Process Logs
我看到您将发送到 AWS Lambda 的数据视为 JSON 对象。您首先要进行 base64 解码,然后解压缩数据。解码和解压缩后,您应该拥有包含日志信息的 JSON 对象。
如果其他人正在寻求有关此主题的帮助。
我采取了稍微不同的方法,但我确实在事件中看到了一个 'awslog' 键。
这是我成功的示例。 Python 3.6 拉姆达。 设置 cloudwatch 触发器以调用 lambda
import gzip
import json
import base64
def lambda_handler(event, context):
print(f'Logging Event: {event}')
print(f"Awslog: {event['awslogs']}")
cw_data = event['awslogs']['data']
print(f'data: {cw_data}')
print(f'type: {type(cw_data)}')
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)
log_events = payload['logEvents']
for log_event in log_events:
print(f'LogEvent: {log_event}')
这是类星体的答案转换为Python 3.
import gzip
import json
import base64
from io import BytesIO
cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=BytesIO(base64.b64decode(cw_data, validate=True))).read()
log_events = json.loads(cw_logs)
for log_event in log_events['logEvents']:
# Process Logs
主要变化是使用 io.BytesIO 和不同的 base64 解码函数来获取日志事件数据。