使用动态分区通过 Lambda 创建 Firehose 数据流时出错
Error creating a Firehose Datastream through Lambda with Dynamic Partitioning
我正在尝试通过 Lambda 函数使用动态分区创建 Kinesis Firehose 流。如果我删除 DynamicPartitioningConfiguration
和 ProcessingConfiguration
元素,并将前缀更改为动态分区友好字符串,该函数将创建一个流。问题是如果这些元素在里面,我会得到一个
的错误
kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params
这是创建流的函数,删除的 ARN 和 Glue 信息都可以在删除动态分区内容的情况下正常工作。
const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();
exports.handler = async (event) => {
let params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aValidARN',
Prefix: '!{partitionKeyFromQuery:obj-type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aValidARN",
DatabaseName: "aValidDBName",
TableName: "aValidTableName"
}
},
DynamicPartitioningConfiguration: {
Enabled: "true"
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: 'type:.type'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
},
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
try{
await firehose.createDeliveryStream(params).promise();
}catch(e){
console.log("kinesis error "+e);
return {
statusCode: 500,
error: JSON.stringify(e)
};
}
return {
statusCode: 200,
body: JSON.stringify("Stream created"),
};
};
这方面的文档帮助不大。我正在使用 JS SDK 参考,该元素恰好位于它应该位于的位置,嵌套在 ExtendedS3DestinationConfiguration 元素中。我认为存在一些未记录的设置冲突,这就是意外密钥 return 的原因,但我是 firehose 的新手,缺乏解决此问题的知识。我根据 and the cloudformation docs at https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-extendeds3destinationconfiguration.html#cfn-kinesisfirehose-deliverystream-extendeds3destinationconfiguration-dynamicpartitioningconfiguration 的答案创建了 ProcessingConfiguration
元素。
如有任何帮助,我们将不胜感激
检查AWS JS SDK的版本。 v2.979.0 推出了 Firehose 动态分区功能。使用JS SDK v2.979.0或以上版本
之前使用 python SDK 时遇到过类似问题。升级版本解决了这个问题
SDK 可以作为一个层添加到 lambda 函数中,这使得最近的更改可用。我使用 https://aws.amazon.com/premiumsupport/knowledge-center/lambda-layer-aws-sdk-latest-version/ 中的说明更新到 sdk 的 v2.997。也可以使用此方法在 > v3 中添加单个包。
我将参数更改为
params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aWorkingARN',
Prefix: '!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/!{partitionKeyFromQuery:object_type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "aWorkingDB",
TableName: "aWorkingTable"
}
},
DynamicPartitioningConfiguration: {
Enabled: true
},
DataFormatConversionConfiguration: {
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "shopifystructures",
TableName: "shopifyproductscrawler_test"
}
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{object_type:.object_type}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{hour:.hour}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{day:.day}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{month:.month}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{year:.year}'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
}
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
并且功能完美运行。
我正在尝试通过 Lambda 函数使用动态分区创建 Kinesis Firehose 流。如果我删除 DynamicPartitioningConfiguration
和 ProcessingConfiguration
元素,并将前缀更改为动态分区友好字符串,该函数将创建一个流。问题是如果这些元素在里面,我会得到一个
kinesis error UnexpectedParameter: Unexpected key 'DynamicPartitioningConfiguration' found in params
这是创建流的函数,删除的 ARN 和 Glue 信息都可以在删除动态分区内容的情况下正常工作。
const AWS = require("aws-sdk");
var firehose = new AWS.Firehose();
exports.handler = async (event) => {
let params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aValidARN',
Prefix: '!{partitionKeyFromQuery:obj-type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aValidARN",
DatabaseName: "aValidDBName",
TableName: "aValidTableName"
}
},
DynamicPartitioningConfiguration: {
Enabled: "true"
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: 'type:.type'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
},
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
try{
await firehose.createDeliveryStream(params).promise();
}catch(e){
console.log("kinesis error "+e);
return {
statusCode: 500,
error: JSON.stringify(e)
};
}
return {
statusCode: 200,
body: JSON.stringify("Stream created"),
};
};
这方面的文档帮助不大。我正在使用 JS SDK 参考,该元素恰好位于它应该位于的位置,嵌套在 ExtendedS3DestinationConfiguration 元素中。我认为存在一些未记录的设置冲突,这就是意外密钥 return 的原因,但我是 firehose 的新手,缺乏解决此问题的知识。我根据 ProcessingConfiguration
元素。
如有任何帮助,我们将不胜感激
检查AWS JS SDK的版本。 v2.979.0 推出了 Firehose 动态分区功能。使用JS SDK v2.979.0或以上版本
之前使用 python SDK 时遇到过类似问题。升级版本解决了这个问题
SDK 可以作为一个层添加到 lambda 函数中,这使得最近的更改可用。我使用 https://aws.amazon.com/premiumsupport/knowledge-center/lambda-layer-aws-sdk-latest-version/ 中的说明更新到 sdk 的 v2.997。也可以使用此方法在 > v3 中添加单个包。
我将参数更改为
params = {
DeliveryStreamName: event.body.shop+"-data-stream",
DeliveryStreamType: "DirectPut",
ExtendedS3DestinationConfiguration: {
BucketARN: 'arn:aws:s3:::'+event.body.shop,
RoleARN: 'aWorkingARN',
Prefix: '!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/!{partitionKeyFromQuery:object_type}/',
ErrorOutputPrefix: 'errors/',
DataFormatConversionConfiguration: {
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "aWorkingDB",
TableName: "aWorkingTable"
}
},
DynamicPartitioningConfiguration: {
Enabled: true
},
DataFormatConversionConfiguration: {
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: false,
ConvertDotsInJsonKeysToUnderscores: false
}
}
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
WriterVersion: "V1"
}
}
},
SchemaConfiguration: {
RoleARN: "aWorkingARN",
DatabaseName: "shopifystructures",
TableName: "shopifyproductscrawler_test"
}
},
ProcessingConfiguration: {
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{object_type:.object_type}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{hour:.hour}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{day:.day}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{month:.month}'
},
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: '{year:.year}'
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: 'JQ-1.6'
}
]
}
]
}
},
Tags: [
{
Key: 'billing-'+event.body.shop,
Value: 'true'
}
]
};
并且功能完美运行。