AWS Firehose 转换 Lambda 函数为每条记录抛出 Lambda.MissingRecordId 错误
AWS Firehose transform Lambda function throws Lambda.MissingRecordId error for every record
我将 AWS firehose 与 S3 备份和 S3 目标存储桶一起使用,效果很好。当我尝试使用 Lambda 函数转换数据时出现问题。
我使用的是 .NET AWS SDK,我的 Lambda 函数是用 C# 编写的,并且使用的是随附的 firehose 转换示例:
[assembly:LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.JsonSerializer))]
namespace LambdaFunctions
{
public class Function
{
public KinesisFirehoseResponse FunctionHandler(KinesisFirehoseEvent evnt, ILambdaContext context)
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"DeliveryStreamArn: {evnt.DeliveryStreamArn}");
context.Logger.LogLine($"Region: {evnt.Region}");
KinesisFirehoseResponse response = new KinesisFirehoseResponse
{
Records = new List<KinesisFirehoseResponse.FirehoseRecord>()
};
foreach (KinesisFirehoseEvent.FirehoseRecord record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\t\tApproximateArrivalEpoch: {record.ApproximateArrivalEpoch}");
context.Logger.LogLine($"\t\tApproximateArrivalTimestamp: {record.ApproximateArrivalTimestamp}");
context.Logger.LogLine($"\t\tData: {record.DecodeData()}");
// Transform data: For example ToUpper the data
KinesisFirehoseResponse.FirehoseRecord transformedRecord = new KinesisFirehoseResponse.FirehoseRecord
{
RecordId = record.RecordId,
Result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK
};
transformedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
response.Records.Add(transformedRecord);
}
return response;
}
}
}
转换 Lambda 函数成功且正确地处理了数据(如测试和日志所示)。
但是,Lambda 函数没有成功将数据返回到 S3 目标存储桶,所有记录都未成功处理。
每条记录返回此错误:
{
"attemptsMade": 1,
"arrivalTimestamp": 1590656820209,
"errorCode": "Lambda.MissingRecordId",
"errorMessage": "One or more record Ids were not returned. Ensure that the Lambda function returns all received record Ids.",
"attemptEndingTimestamp": 1590656883464,
"rawData": "dGVzdDE=",
"lambdaArn": "arn:aws:lambda:Region:AccountNumber:function:transform-function:$LATEST"
}
我不知道为什么或在哪里发生此错误。我知道 Lambda 函数正在返回正确的响应,包括 recordId。
我已经重新创建了所有资源,应用并重新应用了权限,几乎完成了我能想到的所有事情。
使用 Node.js 或 Python 版本时不会发生此问题,它似乎是 .NET 实现所特有的。
编辑:
我忘记将序列化程序程序集属性添加到最终成为问题根源的原始代码块。
AWS提供的C#例子已经过时了,特别是这个序列化包:
Amazon.Lambda.Serialization.SystemTextJson
要解决这个问题,只需用这个替换包即可:
Amazon.Lambda.Serialization.Json
并像这样更新程序集属性:
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace LambdaFunctions
{
...
此序列化包在 "Serializing Lambda functions" 下的 AWS 文档 here 中提到。
但是,Amazon 尚未更新 SDK 示例以反映此更改(或者至少,具体来说是此示例),导致该功能在部署时失败。
我将 AWS firehose 与 S3 备份和 S3 目标存储桶一起使用,效果很好。当我尝试使用 Lambda 函数转换数据时出现问题。
我使用的是 .NET AWS SDK,我的 Lambda 函数是用 C# 编写的,并且使用的是随附的 firehose 转换示例:
[assembly:LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.JsonSerializer))]
namespace LambdaFunctions
{
public class Function
{
public KinesisFirehoseResponse FunctionHandler(KinesisFirehoseEvent evnt, ILambdaContext context)
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"DeliveryStreamArn: {evnt.DeliveryStreamArn}");
context.Logger.LogLine($"Region: {evnt.Region}");
KinesisFirehoseResponse response = new KinesisFirehoseResponse
{
Records = new List<KinesisFirehoseResponse.FirehoseRecord>()
};
foreach (KinesisFirehoseEvent.FirehoseRecord record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\t\tApproximateArrivalEpoch: {record.ApproximateArrivalEpoch}");
context.Logger.LogLine($"\t\tApproximateArrivalTimestamp: {record.ApproximateArrivalTimestamp}");
context.Logger.LogLine($"\t\tData: {record.DecodeData()}");
// Transform data: For example ToUpper the data
KinesisFirehoseResponse.FirehoseRecord transformedRecord = new KinesisFirehoseResponse.FirehoseRecord
{
RecordId = record.RecordId,
Result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK
};
transformedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
response.Records.Add(transformedRecord);
}
return response;
}
}
}
转换 Lambda 函数成功且正确地处理了数据(如测试和日志所示)。
但是,Lambda 函数没有成功将数据返回到 S3 目标存储桶,所有记录都未成功处理。
每条记录返回此错误:
{
"attemptsMade": 1,
"arrivalTimestamp": 1590656820209,
"errorCode": "Lambda.MissingRecordId",
"errorMessage": "One or more record Ids were not returned. Ensure that the Lambda function returns all received record Ids.",
"attemptEndingTimestamp": 1590656883464,
"rawData": "dGVzdDE=",
"lambdaArn": "arn:aws:lambda:Region:AccountNumber:function:transform-function:$LATEST"
}
我不知道为什么或在哪里发生此错误。我知道 Lambda 函数正在返回正确的响应,包括 recordId。
我已经重新创建了所有资源,应用并重新应用了权限,几乎完成了我能想到的所有事情。
使用 Node.js 或 Python 版本时不会发生此问题,它似乎是 .NET 实现所特有的。
编辑:
我忘记将序列化程序程序集属性添加到最终成为问题根源的原始代码块。
AWS提供的C#例子已经过时了,特别是这个序列化包:
Amazon.Lambda.Serialization.SystemTextJson
要解决这个问题,只需用这个替换包即可:
Amazon.Lambda.Serialization.Json
并像这样更新程序集属性:
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace LambdaFunctions
{
...
此序列化包在 "Serializing Lambda functions" 下的 AWS 文档 here 中提到。
但是,Amazon 尚未更新 SDK 示例以反映此更改(或者至少,具体来说是此示例),导致该功能在部署时失败。