如何将转换后的日志记录索引到 AWS Elasticsearch 中?
How to I index the transformed log records into AWS Elasticsearch?
TLDR
由于 "encoding problem"。
,lambda 函数无法将 firehose 日志索引到 AWS 管理的 ES 中
实际错误响应
当我从 firehose record
对单个 logEvent
进行 base64 编码并将收集的记录发送到 AWS 托管 ES 时,我没有收到任何错误。
有关详细信息,请参阅下一节。
Base 64 编码的压缩有效负载被发送到 ES,因为生成的 json 转换对于 ES 来说太大而无法索引 - see this ES link.
我从 AWS 托管 ES 收到以下错误:
{
"deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
"destination": "arn:aws:es:us-west-2:*:domain/*",
"deliveryStreamVersionId": 1,
"message": "The data could not be decoded as UTF-8",
"errorCode": "InvalidEncodingException",
"processor": "arn:aws:lambda:us-west-2:*:function:*"
}
如果输出记录未压缩,the body size is too long
(小至 14MB)。没有压缩和简单的 base64 编码负载,我在 Lambda 日志中收到以下错误:
{
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "not_x_content_exception",
"reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
}
}
描述
我有 Cloudwatch 日志,这些日志按大小/间隔进行缓冲,这些日志被送入 Kinesis Firehose。 firehose 将日志传输到 lambda 函数中,该函数将日志转换为 json 记录,然后将其发送到 AWS 管理的 Elasticsearch 集群。
lambda 函数获得以下 JSON 结构:
{
"invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
"deliveryStreamArn": "arn:aws:firehose:...",
"region": "us-west-2",
"records": [{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"approximateArrivalTimestamp": 1508197563377,
"data": "some_compressed_data_in_base_64_encoding"
}]
}
lambda 函数然后提取 .records[].data
并将数据解码为 base64 并解压缩数据,结果如下 JSON:
{
"messageType": "DATA_MESSAGE",
"owner": "aws_account_number",
"logGroup": "some_cloudwatch_log_group_name",
"logStream": "i-0221b6ec01af47bfb",
"subscriptionFilters": [
"cloudwatch_log_subscription_filter_name"
],
"logEvents": [
{
"id": "33633929427703365813575134502195362621356131219229245440",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_1"
},
{
"id": "33633929427703365813575134502195362621356131219229245441",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_2"
},
{
"id": "33633929427703365813575134502195362621356131219229245442",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_3"
}
]
}
.logEvents[]
中的单个项目被转换为 json 结构,其中键是在 Kibana 中搜索日志时所需的列 - 如下所示:
{
'journalctl_host': 'ip-172-11-11-111',
'process': 'haproxy',
'pid': 15507,
'client_ip': '172.11.11.111',
'client_port': 3924,
'frontend_name': 'http-web',
'backend_name': 'server',
'server_name': 'server-3',
'time_duration': 10,
'status_code': 200,
'bytes_read': 79,
'@timestamp': '1900-10-16T23:46:01.0Z',
'tags': ['haproxy'],
'message': 'HEAD / HTTP/1.1'
}
转换后的 json 被收集到一个数组中,该数组得到 zlib 压缩和 base64 编码的字符串,然后将其转换为新的 json 有效载荷作为最终的 lambda 结果:
{
"records": [
{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"result": "Ok",
"data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
}
]}
Cloudwatch 配置
13 个日志条目 (~4kb) 可以转换为大约 635kb。
我还降低了 awslog 的阈值,希望发送到 Lambda 函数的日志的大小会变小:
buffer_duration = 10
batch_count = 10
batch_size = 500
不幸的是,当出现突发时 - 峰值可能超过 2800 行,其中大小超过 1MB。
当 lambda 函数产生的负载为 "too big"(转换后的日志约 13mb)时,lambda cloudwatch 日志中会记录错误 - "body size is too long"。似乎没有任何迹象表明此错误来自何处或 lambda fn 的响应负载是否存在大小限制。
因此,AWS 支持人员告诉我,无法缓解以下限制来解决此流程:
- lambda 负载大小
- 进入 lambda 的压缩 firehose 有效载荷与 lambda 输出成正比。
相反,我将架构修改为以下内容:
- Cloudwatch 日志通过 Firehose 在 S3 中备份。
- S3 事件由 lambda 函数处理。
- lambda 函数 returns 如果 lambda 转换并能够成功地将日志批量索引到 ES,则为成功代码。
- 如果 lambda 函数失败,死信队列 (AWS SQS) 会配置一个 cloudwatch 警报。可以在 here.
中找到示例 cloudformation 片段
- 如果有 SQS 消息,可以使用这些消息手动调用 lambda 函数,或者设置 AWS 批处理作业以使用 lambda 函数处理 SQS 消息。但是,应该小心,lambda 函数不会再次故障转移到 DLQ 中。检查 lambda cloudwatch 日志以检查为什么未处理该消息并将其发送到 DLQ。
TLDR
由于 "encoding problem"。
,lambda 函数无法将 firehose 日志索引到 AWS 管理的 ES 中实际错误响应
当我从 firehose record
对单个 logEvent
进行 base64 编码并将收集的记录发送到 AWS 托管 ES 时,我没有收到任何错误。
有关详细信息,请参阅下一节。
Base 64 编码的压缩有效负载被发送到 ES,因为生成的 json 转换对于 ES 来说太大而无法索引 - see this ES link.
我从 AWS 托管 ES 收到以下错误:
{
"deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
"destination": "arn:aws:es:us-west-2:*:domain/*",
"deliveryStreamVersionId": 1,
"message": "The data could not be decoded as UTF-8",
"errorCode": "InvalidEncodingException",
"processor": "arn:aws:lambda:us-west-2:*:function:*"
}
如果输出记录未压缩,the body size is too long
(小至 14MB)。没有压缩和简单的 base64 编码负载,我在 Lambda 日志中收到以下错误:
{
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "not_x_content_exception",
"reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
}
}
描述
我有 Cloudwatch 日志,这些日志按大小/间隔进行缓冲,这些日志被送入 Kinesis Firehose。 firehose 将日志传输到 lambda 函数中,该函数将日志转换为 json 记录,然后将其发送到 AWS 管理的 Elasticsearch 集群。
lambda 函数获得以下 JSON 结构:
{
"invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
"deliveryStreamArn": "arn:aws:firehose:...",
"region": "us-west-2",
"records": [{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"approximateArrivalTimestamp": 1508197563377,
"data": "some_compressed_data_in_base_64_encoding"
}]
}
lambda 函数然后提取 .records[].data
并将数据解码为 base64 并解压缩数据,结果如下 JSON:
{
"messageType": "DATA_MESSAGE",
"owner": "aws_account_number",
"logGroup": "some_cloudwatch_log_group_name",
"logStream": "i-0221b6ec01af47bfb",
"subscriptionFilters": [
"cloudwatch_log_subscription_filter_name"
],
"logEvents": [
{
"id": "33633929427703365813575134502195362621356131219229245440",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_1"
},
{
"id": "33633929427703365813575134502195362621356131219229245441",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_2"
},
{
"id": "33633929427703365813575134502195362621356131219229245442",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_3"
}
]
}
.logEvents[]
中的单个项目被转换为 json 结构,其中键是在 Kibana 中搜索日志时所需的列 - 如下所示:
{
'journalctl_host': 'ip-172-11-11-111',
'process': 'haproxy',
'pid': 15507,
'client_ip': '172.11.11.111',
'client_port': 3924,
'frontend_name': 'http-web',
'backend_name': 'server',
'server_name': 'server-3',
'time_duration': 10,
'status_code': 200,
'bytes_read': 79,
'@timestamp': '1900-10-16T23:46:01.0Z',
'tags': ['haproxy'],
'message': 'HEAD / HTTP/1.1'
}
转换后的 json 被收集到一个数组中,该数组得到 zlib 压缩和 base64 编码的字符串,然后将其转换为新的 json 有效载荷作为最终的 lambda 结果:
{
"records": [
{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"result": "Ok",
"data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
}
]}
Cloudwatch 配置
13 个日志条目 (~4kb) 可以转换为大约 635kb。
我还降低了 awslog 的阈值,希望发送到 Lambda 函数的日志的大小会变小:
buffer_duration = 10
batch_count = 10
batch_size = 500
不幸的是,当出现突发时 - 峰值可能超过 2800 行,其中大小超过 1MB。
当 lambda 函数产生的负载为 "too big"(转换后的日志约 13mb)时,lambda cloudwatch 日志中会记录错误 - "body size is too long"。似乎没有任何迹象表明此错误来自何处或 lambda fn 的响应负载是否存在大小限制。
因此,AWS 支持人员告诉我,无法缓解以下限制来解决此流程:
- lambda 负载大小
- 进入 lambda 的压缩 firehose 有效载荷与 lambda 输出成正比。
相反,我将架构修改为以下内容:
- Cloudwatch 日志通过 Firehose 在 S3 中备份。
- S3 事件由 lambda 函数处理。
- lambda 函数 returns 如果 lambda 转换并能够成功地将日志批量索引到 ES,则为成功代码。
- 如果 lambda 函数失败,死信队列 (AWS SQS) 会配置一个 cloudwatch 警报。可以在 here. 中找到示例 cloudformation 片段
- 如果有 SQS 消息,可以使用这些消息手动调用 lambda 函数,或者设置 AWS 批处理作业以使用 lambda 函数处理 SQS 消息。但是,应该小心,lambda 函数不会再次故障转移到 DLQ 中。检查 lambda cloudwatch 日志以检查为什么未处理该消息并将其发送到 DLQ。