使用动态分区通过 Lambda 创建 Firehose 数据流时出错

Error creating a Firehose Datastream through Lambda with Dynamic Partitioning

我正在尝试通过 Lambda 函数使用动态分区创建 Kinesis Firehose 流。如果我删除 DynamicPartitioningConfigurationProcessingConfiguration 元素,并将前缀更改为动态分区友好字符串,该函数将创建一个流。问题是如果这些元素在里面,我会得到一个

的错误
kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params

这是创建流的函数,删除的 ARN 和 Glue 信息都可以在删除动态分区内容的情况下正常工作。

const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();

exports.handler = async (event) => {
    
    let params = {
      DeliveryStreamName: event.body.shop+"-data-stream",
      DeliveryStreamType: "DirectPut",
      ExtendedS3DestinationConfiguration: {
        BucketARN: 'arn:aws:s3:::'+event.body.shop,
        RoleARN: 'aValidARN',
        Prefix: '!{partitionKeyFromQuery:obj-type}/',
        ErrorOutputPrefix: 'errors/',
        DataFormatConversionConfiguration: {
            Enabled: true,
            InputFormatConfiguration: {
              Deserializer: {
                  OpenXJsonSerDe: {
                    CaseInsensitive: false,
                    ConvertDotsInJsonKeysToUnderscores: false
                  }
              }
            },  
            OutputFormatConfiguration: {
                Serializer: {
                  ParquetSerDe: {
                    Compression: "SNAPPY",
                    WriterVersion: "V1"
                  }                    
                }
            },
            SchemaConfiguration: {
                RoleARN: "aValidARN",
                DatabaseName: "aValidDBName",
                TableName: "aValidTableName"
            }
        },
        DynamicPartitioningConfiguration: {
            Enabled: "true"
        },  
        ProcessingConfiguration: {
          Enabled: true,
          Processors: [
            {
              Type: "MetadataExtraction",
              Parameters: [
                {
                  ParameterName: "MetadataExtractionQuery",
                  ParameterValue: 'type:.type'
                },
                {
                  ParameterName: "JsonParsingEngine",
                  ParameterValue: 'JQ-1.6'
                }
              ]
            }
          ]
        },         
      }, 
      Tags: [
        {
          Key: 'billing-'+event.body.shop,
          Value: 'true'
        }
      ]
    };
    
    try{
        await firehose.createDeliveryStream(params).promise();    
    }catch(e){
        console.log("kinesis error "+e);
        return {
            statusCode: 500,
            error: JSON.stringify(e)
        };          
    }

    return {
        statusCode: 200,
        body: JSON.stringify("Stream created"),
    };
};

这方面的文档帮助不大。我正在使用 JS SDK 参考,该元素恰好位于它应该位于的位置,嵌套在 ExtendedS3DestinationConfiguration 元素中。我认为存在一些未记录的设置冲突,这就是意外密钥 return 的原因,但我是 firehose 的新手,缺乏解决此问题的知识。我根据 and the cloudformation docs at https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-extendeds3destinationconfiguration.html#cfn-kinesisfirehose-deliverystream-extendeds3destinationconfiguration-dynamicpartitioningconfiguration 的答案创建了 ProcessingConfiguration 元素。

如有任何帮助,我们将不胜感激

检查AWS JS SDK的版本。 v2.979.0 推出了 Firehose 动态分区功能。使用JS SDK v2.979.0或以上版本

之前使用 python SDK 时遇到过类似问题。升级版本解决了这个问题

SDK 可以作为一个层添加到 lambda 函数中,这使得最近的更改可用。我使用 https://aws.amazon.com/premiumsupport/knowledge-center/lambda-layer-aws-sdk-latest-version/ 中的说明更新到 sdk 的 v2.997。也可以使用此方法在 > v3 中添加单个包。

我将参数更改为

params = {
  DeliveryStreamName: event.body.shop+"-data-stream",
  DeliveryStreamType: "DirectPut",
  ExtendedS3DestinationConfiguration: {
    BucketARN: 'arn:aws:s3:::'+event.body.shop,
    RoleARN: 'aWorkingARN',
    Prefix: '!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/!{partitionKeyFromQuery:object_type}/',
    ErrorOutputPrefix: 'errors/',
    DataFormatConversionConfiguration: {
        Enabled: true,
        InputFormatConfiguration: {
          Deserializer: {
              OpenXJsonSerDe: {
                CaseInsensitive: false,
                ConvertDotsInJsonKeysToUnderscores: false
              }
          }
        },  
        OutputFormatConfiguration: {
            Serializer: {
              ParquetSerDe: {
                Compression: "SNAPPY",
                WriterVersion: "V1"
              }                    
            }
        },
        SchemaConfiguration: {
            RoleARN: "aWorkingARN",
            DatabaseName: "aWorkingDB",
            TableName: "aWorkingTable"
        }
    },
    DynamicPartitioningConfiguration: {
        Enabled: true
    },       
    DataFormatConversionConfiguration: {
        InputFormatConfiguration: {
          Deserializer: {
              OpenXJsonSerDe: {
                CaseInsensitive: false,
                ConvertDotsInJsonKeysToUnderscores: false
              }
          }
        },  
        OutputFormatConfiguration: {
            Serializer: {
              ParquetSerDe: {
                Compression: "SNAPPY",
                WriterVersion: "V1"
              }                    
            }
        },
        SchemaConfiguration: {
            RoleARN: "aWorkingARN",
            DatabaseName: "shopifystructures",
            TableName: "shopifyproductscrawler_test"
        }
    },
    ProcessingConfiguration: {
      Enabled: true,
      Processors: [
        {
          Type: "MetadataExtraction",
          Parameters: [
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{object_type:.object_type}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{hour:.hour}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{day:.day}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{month:.month}'
            },
            {
              ParameterName: "MetadataExtractionQuery",
              ParameterValue: '{year:.year}'
            },                
            {
              ParameterName: "JsonParsingEngine",
              ParameterValue: 'JQ-1.6'
            }
          ]
        }
      ]
    }        
  }, 
  Tags: [
    {
      Key: 'billing-'+event.body.shop,
      Value: 'true'
    }
  ]
};

并且功能完美运行。