AWS lambda 转换为 firehose:Python

AWS lambda transformation to firehose: Python

我想使用 lambda 函数转换 AWS 运动流数据,然后使用 AWS firehose 传送到 S3。但是,我总是遇到这个问题: errorCode":"Lambda.FunctionError","errorMessage":"Lambda 函数调用成功,但返回错误结果。"

这是lambda_function。


import base64
import json

def lambda_handler(event, context):
    output = []
    for record in event['Records']:
        # your own business logic.
        json_object = {"name": "this is a test"}
        output_record = {
            'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right. 
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return {'records': output}

此处发布了一个相关问题。 。但似乎运动数据格式与我得到的不同。注意到我得到的事件是这样的,是Capital Records,而不是records。而且没有recordId,而是eventID.

{
    'Records': [
        {
            'kinesis': {
                'kinesisSchemaVersion': '1.0', 
                'partitionKey': '1', 
                'sequenceNumber': '49603262076998903856573762341186472148109820820203765762', 
                'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=', 
                'approximateArrivalTimestamp': 1596314234.567
            }, 
            'eventSource': 'aws:kinesis', 
            'eventVersion': '1.0', 
            'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762', 
            'eventName': 'aws:kinesis:record', 
            'invokeIdentityArn':'xxx', 
            'awsRegion': 'us-east-1', 
            'eventSourceARN': 'xxx'
        }
    ]
}

这取决于您如何配置 Kinesis、Firehose 和 Lambda 管道。

如果您的 Kinesis 流触发 Lambda 以将数据传送到 Firehose,那么您会对 Kinesis 记录事件感兴趣。签出 Using AWS Lambda with Amazon Kinesis。下面的示例事件

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

另一个设置可能是 Firehose 轮询 Kinesis 流。此外,我们还可以灵活地为 Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)

设置转换 Lambda
{
  "invocationId": "invoked123",
  "deliveryStreamArn": "aws:lambda:events",
  "region": "us-west-2",
  "records": [
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record1",
      "approximateArrivalTimestamp": 1510772160000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000000",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
        "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
        "subsequenceNumber": ""
      }
    },
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record2",
      "approximateArrivalTimestamp": 151077216000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000001",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
        "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
        "subsequenceNumber": ""
      }
    }
  ]
}
  1. 问题似乎与第二种设置有关。
  2. 您的数据管道似乎使用的是第一种设置。