Kinesis:一个 Webhook 出现“(ValidationException) 调用 PutRecord 操作时”错误。然而,另一个 Webhook 成功了

Kinesis: "(ValidationException) when calling the PutRecord operation" error for one Webhook. However another Webhook Succeeds

我正在尝试通过运动传输流将 Punchh webhook JSON 数据流式传输到多个 S3 存储桶,但它给了我这个错误。但是,当我流式传输 Iterable webhook 数据时,Iterable S3 存储桶已成功填充来自 Iterable wehbook 发送的可交付流的数据。

Here is the error:

An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value 'data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder' at 'deliveryStreamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+

注意: S3 prefix value -in bold- 上面说的错误和我测试成功的Iterable prefixes类似.

这是导致错误的流之一的 Punchh S3 前缀:

data/landing/vendor/punchh/punchh_to_dd/ex-points-reminder/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/dynamic

这是 Punch S3 流的目的地(注意 X 用作占位符以隐藏无关的真实地址):

arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/lou-punchh-ex-points-reminder-events

这是代理(后端)Lambda 的 Python 代码(Punchh 的类似代码作为 Iterable)。我有几个动态 S3 存储桶与它们各自的传输流配对:

import boto3
import json
import base64
import os

client = boto3.client('firehose', region_name= 'us-west-2')
delivery_stream_lou = {}
def lambda_handler(event, context):
    try:
        print(event)
        print('<< EVENT BODY  >>',event['event_name'])
        try:
           
            print('<< EVENT NAME >> ',event['event_name'])
            eventName = event['event_name']
            eventType =  event['event_type']
            if 'users' == eventName.lower():
                #ADD firehose KINESIS for USERS
                print('USERS FOUND')
                delivery_stream_lou = os.environ['DSTREAMUSERS']
               
            elif 'loyalty' in eventType.lower():
                #ADD firehose KINESIS for LOYALTY
                print('LOYALTY FOUND')
                delivery_stream_lou = os.environ['DSTREAMLOYALTY']
                
            elif 'gift' == eventType.lower():
                #ADD firehose KINESIS for LOYALTY
                print('GIFT FOUND')
                delivery_stream_lou = os.environ['DSTREAMGIFT']
                
            elif 'redemptions' == eventName.lower():
                #ADD firehose KINESIS for REDEMPTIONS
                print('REDEMPTIONS FOUND')
                delivery_stream_lou = os.environ['DSTREAMREDEMPTIONS']
                
            elif 'rewards' == eventName.lower():
                #ADD firehose KINESIS for REWARDS
                print('REWARDS FOUND')
                delivery_stream_lou = os.environ['DSTREAMREWARDS']
                
            elif 'redeemables' == eventName.lower():
                #ADD firehose KINESIS for REDEEMABLES
                print('REDEEMABLES FOUND')
                delivery_stream_lou = os.environ['DSTREAMREDEEMABLES']
            
            elif 'points_expiry_reminder' == eventType.lower():
                #ADD firehose KINESIS for EXPIRY PT REMINDER
                print('EXPIRY PT REMINDER FOUND')
                delivery_stream_lou = os.environ['DSTREAMEXPOINTSTREMINDER']  
                print('<< DELIVERY STREAM LOU >> ', delivery_stream_lou)
            
            elif 'points_expiry' == eventType.lower():
                #ADD firehose KINESIS for EXPIRY PT REMINDER
                print('EXPIRY PT FOUND')
                delivery_stream_lou = os.environ['DSTREAMEXPOINTS']
                
            elif 'signup_campaign' == eventType.lower():
                #ADD firehose KINESIS for SIGNUP CAMPAIGN
                print('SIGNUP FOUND')
                delivery_stream_lou = os.environ['DSTREAMSIGNUP']  
                
            else:
                print('<< ERROR - NO VALID EVENT NAME FOUND !!  >>')
                
            if delivery_stream_lou !='' and delivery_stream_lou is not None and delivery_stream_lou != 'null':
                jsonData = json.dumps(event) + '\n' 
                print('<< JSON DATA >> ',jsonData)  
                response = client.put_record(DeliveryStreamName=delivery_stream_lou, Record={'Data': jsonData})  
                print(response)
                return {
                    'statusCode': 200,
                    'headers': {'Content-Type': 'application/json'},
                    'body': json.dumps({"message":"success"})
                }
        except Exception as e: 
            print(str(e))
            return {
                'statusCode': 200,
                'headers': {'Content-Type': 'application/json'},
                'body': json.dumps({"message":"process failed"})
            }
    except Exception as e:
        print(str(e))
        return {
            'statusCode': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps({"message":"process failed 2"})
        }

Note: The code is almost identical for this Punchh lambda as the Iterable Lambda except for the different os.environment variables that identify the different buck prefixes.

感谢您的帮助。

根据错误消息 python 代码正在将 data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder 值传递给 PutRecord 请求中的 deliveryStreamName 参数。我认为那不是您的交付流的名称。检查正在填充 deliveryStreamName 参数

的环境变量的值