用于接收来自 AWS SQS 的消息和 AWS S3 中的批存储的 Logstash 替代方案

Logstash alternative to receive messages from AWS SQS and batch store in AWS S3

我需要能够将日志作为文本文件批量存储在 AWS S3 中,格式适合 JSON-SerDe。

其中一个批处理日志文件在 S3 上的外观示例,日期时间格式为 yyyy-MM-dd HH:mm:ss

非常重要
{"message":"Message number 1","datetime":"2020-12-01 14:37:00"}
{"message":"Message number 2","datetime":"2020-12-01 14:38:00"}
{"message":"Message number 3","datetime":"2020-12-01 14:39:00"}

理想情况下,这些将每 5 秒或当排队的消息达到 50 条时存储在 S3 上,但也是可配置的。


我几乎已经设法使用 sqs input plugin and the s3 output plugin 使用以下配置

来使它与 Logstash 一起工作
input {
  sqs {
    endpoint => "AWS_SQS_ENDPOINT"
    queue => "logs"
  }
}

output {
   s3 {
     access_key_id => "AWS_ACCESS_KEY_ID"
     secret_access_key => "AWS_SECRET_ACCESS_KEY"
     region => "AWS_REGION"
     bucket => "AWS_BUCKET"
     prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
     size_file => 128
     time_file => 5
     codec => "json_lines"
     encoding => "gzip"
     canned_acl => "private"
   }
}

问题是 S3 输出插件需要与我们的查询工具不兼容的 @timestamp 字段。如果您使用 mutate 过滤器删除 @timestamp 或更改为 datetime,则它将不会处理日志。我们不能为每条记录存储日期时间字段和@timestamp,因为这会大大增加我们需要存储的数据量(数百万条日志)。

是否有任何其他软件替代品可以实现此结果?


由于 [Badger][https://whosebug.com/users/11792977/badger]

,更新了与 Logstash 一起使用的配置
input {
  sqs {
    endpoint => "http://AWS_SQS_ENDPOINT"
    queue => "logs"
  }
}

filter {
  mutate {
    add_field => {
      "[@metadata][year]" => "%{+YYYY}"
      "[@metadata][month]" => "%{+MM}"
      "[@metadata][day]" => "%{+dd}"
    }
    remove_field => [ "@timestamp" ]
  }
}

output {
   s3 {
     access_key_id => "AWS_ACCESS_KEY_ID"
     secret_access_key => "AWS_SECRET_ACCESS_KEY"
     region => "AWS_REGION"
     bucket => "AWS_BUCKET"
     prefix => "audit/year=%{[@metadata][year]}/month=%{[@metadata][month]}/day=%{[@metadata][day]}"
     # 1 MB
     size_file => 1024
     # 1 Minute
     time_file => 1
     codec => "json_lines"
     encoding => "gzip"
     canned_acl => "private"
   }
}

我在 s3 输出代码中没有看到任何对 @timestamp 的依赖。您已经通过在 prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/" 中使用对它的 sprintf 引用创建了一个。您可以将这些 sprintf 引用移动到 mutate+add_field 过滤器,该过滤器将字段添加到 [@metadata],然后删除 @timestamp,然后在前缀选项中引用 [@metadata] 字段。