Cloudwatch 日志 -> Kinesis Firehose -> S3 - 不正确 JSON?

Cloudwatch Logs -> Kinesis Firehose -> S3 - not proper JSON?

我正在尝试使用 Cloudwatch 订阅过滤器构建集中式日志记录解决方案,以将日志写入 Kinesis Firehose -> S3 -> AWS Glue -> Athena。我 运行 遇到了很多数据格式问题。

最初,我使用 AWS::KinesisFirehose 的 S3DestinationConfiguration 写入 S3,然后尝试使用 AWS::Glue::Crawler 抓取数据或在中手动创建 table Cloudformation 模板。我发现 Crawler 在确定 S3 上的数据格式时遇到了很多麻烦(发现 ION 而不是 JSON - Athena 无法查询 ION)。我现在正在尝试 ExtendedS3DestinationConfiguration,它允许显式配置输入和输出格式以强制其拼花。

不幸的是,使用此设置 Kinesis Firehose returns 错误日志显示输入无效 JSON。这让我想知道 Cloudwatch 订阅过滤器是否没有正确写入 JSON - 但此对象上没有配置选项来控制数据格式。

这不是一个特别不寻常的问题陈述,所以那里的人必须有正确的配置。以下是我的失败配置的一些片段:

ExtendedS3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: UNCOMPRESSED
        RoleARN: !GetAtt FirehoseRole.Arn
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              OpenXJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            DatabaseName: !Ref CentralizedLoggingDatabase
            Region: !Ref AWS::Region
            RoleARN: !GetAtt FirehoseRole.Arn
            TableName: !Ref LogsGlueTable
            VersionId: LATEST

以前的配置:

S3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseRole.Arn

和爬虫:

Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub ${DNSEndPoint}_logging_s3_crawler_${Environment}
      DatabaseName: !Ref CentralizedLoggingDatabase
      Description: AWS Glue crawler to crawl logs on S3
      Role: !GetAtt CentralizedLoggingGlueRole.Arn
#      Schedule: ## run on demand
#        ScheduleExpression: cron(40 * * * ? *)
      Targets:
        S3Targets:
          - Path: !Sub s3://${S3Bucket}/${S3LogsPath}
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG
      TablePrefix: !Sub ${AppId}_${Environment}_

错误,使用ExtendedS3DestinationConfiguration:

"attemptsMade":1,"arrivalTimestamp":1582650068665,"lastErrorCode":"DataFormatConversion.ParseError","lastErrorMessage":"遇到格式错误JSON. 非法字符 ((CTRL-CHAR, code 31)): 在 [Source: com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream 的 tokens\n 之间只允许普通的白色 space (\r, \n, \t) ]@2ce955fc;行:1,列:2]

似乎这里有一些配置问题,但我找不到它。

所以我刚刚在类似的情况下经历过这个,但现在它开始工作了。

Firehose 将日志写入 S3 压缩的 Base64,并作为 JSON 记录的数组。 Athena读取数据需要解压,每行1条JSON条记录。

因此,从蓝图创建一个 lambda 函数:kinesis-firehose-cloudwatch-logs-processor 在您的 Firehose 中启用转换,并指定上述 lambda 函数。 这将解压缩,并将 json 放入 S3 每行 1 条记录。

创造雅典娜 table:

CREATE EXTERNAL TABLE mydb.mytable(
  eventversion string COMMENT 'from deserializer', 
  useridentity struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer', 
  eventtime string COMMENT 'from deserializer', 
  eventsource string COMMENT 'from deserializer', 
  eventname string COMMENT 'from deserializer', 
  awsregion string COMMENT 'from deserializer', 
  sourceipaddress string COMMENT 'from deserializer', 
  useragent string COMMENT 'from deserializer', 
  errorcode string COMMENT 'from deserializer', 
  errormessage string COMMENT 'from deserializer', 
  requestparameters string COMMENT 'from deserializer', 
  responseelements string COMMENT 'from deserializer', 
  additionaleventdata string COMMENT 'from deserializer', 
  requestid string COMMENT 'from deserializer', 
  eventid string COMMENT 'from deserializer', 
  resources array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer', 
  eventtype string COMMENT 'from deserializer', 
  apiversion string COMMENT 'from deserializer', 
  readonly string COMMENT 'from deserializer', 
  recipientaccountid string COMMENT 'from deserializer', 
  serviceeventdetails string COMMENT 'from deserializer', 
  sharedeventid string COMMENT 'from deserializer', 
  vpcendpointid string COMMENT 'from deserializer', 
  managementevent boolean COMMENT 'from deserializer', 
  eventcategory string COMMENT 'from deserializer')
PARTITIONED BY ( 
  datehour string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='awsRegion,eventCategory,eventID,eventName,eventSource,eventTime,eventType,eventVersion,managementEvent,readOnly,recipientAccountId,requestID,requestParameters,responseElements,sourceIPAddress,userAgent,userIdentity') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://mybucket/prefix'
TBLPROPERTIES (
  'projection.datehour.format'='yyyy/MM/dd/HH', 
  'projection.datehour.interval'='1', 
  'projection.datehour.interval.unit'='HOURS', 
  'projection.datehour.range'='2021/01/01/00,NOW', 
  'projection.datehour.type'='date', 
  'projection.enabled'='true', 
  'storage.location.template'='s3://mybucket/myprefix/${datehour}'
)