用于接收来自 AWS SQS 的消息和 AWS S3 中的批存储的 Logstash 替代方案
Logstash alternative to receive messages from AWS SQS and batch store in AWS S3
我需要能够将日志作为文本文件批量存储在 AWS S3 中,格式适合 JSON-SerDe。
其中一个批处理日志文件在 S3 上的外观示例,日期时间格式为 yyyy-MM-dd HH:mm:ss
非常重要
{"message":"Message number 1","datetime":"2020-12-01 14:37:00"}
{"message":"Message number 2","datetime":"2020-12-01 14:38:00"}
{"message":"Message number 3","datetime":"2020-12-01 14:39:00"}
理想情况下,这些将每 5 秒或当排队的消息达到 50 条时存储在 S3 上,但也是可配置的。
我几乎已经设法使用 sqs input plugin and the s3 output plugin 使用以下配置
来使它与 Logstash 一起工作
input {
sqs {
endpoint => "AWS_SQS_ENDPOINT"
queue => "logs"
}
}
output {
s3 {
access_key_id => "AWS_ACCESS_KEY_ID"
secret_access_key => "AWS_SECRET_ACCESS_KEY"
region => "AWS_REGION"
bucket => "AWS_BUCKET"
prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
size_file => 128
time_file => 5
codec => "json_lines"
encoding => "gzip"
canned_acl => "private"
}
}
问题是 S3 输出插件需要与我们的查询工具不兼容的 @timestamp
字段。如果您使用 mutate 过滤器删除 @timestamp 或更改为 datetime,则它将不会处理日志。我们不能为每条记录存储日期时间字段和@timestamp,因为这会大大增加我们需要存储的数据量(数百万条日志)。
是否有任何其他软件替代品可以实现此结果?
由于 [Badger][https://whosebug.com/users/11792977/badger]
,更新了与 Logstash 一起使用的配置
input {
sqs {
endpoint => "http://AWS_SQS_ENDPOINT"
queue => "logs"
}
}
filter {
mutate {
add_field => {
"[@metadata][year]" => "%{+YYYY}"
"[@metadata][month]" => "%{+MM}"
"[@metadata][day]" => "%{+dd}"
}
remove_field => [ "@timestamp" ]
}
}
output {
s3 {
access_key_id => "AWS_ACCESS_KEY_ID"
secret_access_key => "AWS_SECRET_ACCESS_KEY"
region => "AWS_REGION"
bucket => "AWS_BUCKET"
prefix => "audit/year=%{[@metadata][year]}/month=%{[@metadata][month]}/day=%{[@metadata][day]}"
# 1 MB
size_file => 1024
# 1 Minute
time_file => 1
codec => "json_lines"
encoding => "gzip"
canned_acl => "private"
}
}
我在 s3 输出代码中没有看到任何对 @timestamp 的依赖。您已经通过在 prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
中使用对它的 sprintf 引用创建了一个。您可以将这些 sprintf 引用移动到 mutate+add_field 过滤器,该过滤器将字段添加到 [@metadata],然后删除 @timestamp,然后在前缀选项中引用 [@metadata] 字段。
我需要能够将日志作为文本文件批量存储在 AWS S3 中,格式适合 JSON-SerDe。
其中一个批处理日志文件在 S3 上的外观示例,日期时间格式为 yyyy-MM-dd HH:mm:ss
{"message":"Message number 1","datetime":"2020-12-01 14:37:00"}
{"message":"Message number 2","datetime":"2020-12-01 14:38:00"}
{"message":"Message number 3","datetime":"2020-12-01 14:39:00"}
理想情况下,这些将每 5 秒或当排队的消息达到 50 条时存储在 S3 上,但也是可配置的。
我几乎已经设法使用 sqs input plugin and the s3 output plugin 使用以下配置
来使它与 Logstash 一起工作input {
sqs {
endpoint => "AWS_SQS_ENDPOINT"
queue => "logs"
}
}
output {
s3 {
access_key_id => "AWS_ACCESS_KEY_ID"
secret_access_key => "AWS_SECRET_ACCESS_KEY"
region => "AWS_REGION"
bucket => "AWS_BUCKET"
prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
size_file => 128
time_file => 5
codec => "json_lines"
encoding => "gzip"
canned_acl => "private"
}
}
问题是 S3 输出插件需要与我们的查询工具不兼容的 @timestamp
字段。如果您使用 mutate 过滤器删除 @timestamp 或更改为 datetime,则它将不会处理日志。我们不能为每条记录存储日期时间字段和@timestamp,因为这会大大增加我们需要存储的数据量(数百万条日志)。
是否有任何其他软件替代品可以实现此结果?
由于 [Badger][https://whosebug.com/users/11792977/badger]
,更新了与 Logstash 一起使用的配置input {
sqs {
endpoint => "http://AWS_SQS_ENDPOINT"
queue => "logs"
}
}
filter {
mutate {
add_field => {
"[@metadata][year]" => "%{+YYYY}"
"[@metadata][month]" => "%{+MM}"
"[@metadata][day]" => "%{+dd}"
}
remove_field => [ "@timestamp" ]
}
}
output {
s3 {
access_key_id => "AWS_ACCESS_KEY_ID"
secret_access_key => "AWS_SECRET_ACCESS_KEY"
region => "AWS_REGION"
bucket => "AWS_BUCKET"
prefix => "audit/year=%{[@metadata][year]}/month=%{[@metadata][month]}/day=%{[@metadata][day]}"
# 1 MB
size_file => 1024
# 1 Minute
time_file => 1
codec => "json_lines"
encoding => "gzip"
canned_acl => "private"
}
}
我在 s3 输出代码中没有看到任何对 @timestamp 的依赖。您已经通过在 prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
中使用对它的 sprintf 引用创建了一个。您可以将这些 sprintf 引用移动到 mutate+add_field 过滤器,该过滤器将字段添加到 [@metadata],然后删除 @timestamp,然后在前缀选项中引用 [@metadata] 字段。