Kinesis:一个 Webhook 出现“(ValidationException) 调用 PutRecord 操作时”错误。然而,另一个 Webhook 成功了
Kinesis: "(ValidationException) when calling the PutRecord operation" error for one Webhook. However another Webhook Succeeds
我正在尝试通过运动传输流将 Punchh webhook JSON 数据流式传输到多个 S3 存储桶,但它给了我这个错误。但是,当我流式传输 Iterable webhook 数据时,Iterable S3 存储桶已成功填充来自 Iterable wehbook 发送的可交付流的数据。
Here is the error:
An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value 'data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder' at 'deliveryStreamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+
注意: S3 prefix value -in bold- 上面说的错误和我测试成功的Iterable prefixes类似.
这是导致错误的流之一的 Punchh S3 前缀:
data/landing/vendor/punchh/punchh_to_dd/ex-points-reminder/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/dynamic
这是 Punch S3 流的目的地(注意 X 用作占位符以隐藏无关的真实地址):
arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/lou-punchh-ex-points-reminder-events
这是代理(后端)Lambda 的 Python 代码(Punchh 的类似代码作为 Iterable)。我有几个动态 S3 存储桶与它们各自的传输流配对:
import boto3
import json
import base64
import os
client = boto3.client('firehose', region_name= 'us-west-2')
delivery_stream_lou = {}
def lambda_handler(event, context):
try:
print(event)
print('<< EVENT BODY >>',event['event_name'])
try:
print('<< EVENT NAME >> ',event['event_name'])
eventName = event['event_name']
eventType = event['event_type']
if 'users' == eventName.lower():
#ADD firehose KINESIS for USERS
print('USERS FOUND')
delivery_stream_lou = os.environ['DSTREAMUSERS']
elif 'loyalty' in eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('LOYALTY FOUND')
delivery_stream_lou = os.environ['DSTREAMLOYALTY']
elif 'gift' == eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('GIFT FOUND')
delivery_stream_lou = os.environ['DSTREAMGIFT']
elif 'redemptions' == eventName.lower():
#ADD firehose KINESIS for REDEMPTIONS
print('REDEMPTIONS FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEMPTIONS']
elif 'rewards' == eventName.lower():
#ADD firehose KINESIS for REWARDS
print('REWARDS FOUND')
delivery_stream_lou = os.environ['DSTREAMREWARDS']
elif 'redeemables' == eventName.lower():
#ADD firehose KINESIS for REDEEMABLES
print('REDEEMABLES FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEEMABLES']
elif 'points_expiry_reminder' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT REMINDER FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTSTREMINDER']
print('<< DELIVERY STREAM LOU >> ', delivery_stream_lou)
elif 'points_expiry' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTS']
elif 'signup_campaign' == eventType.lower():
#ADD firehose KINESIS for SIGNUP CAMPAIGN
print('SIGNUP FOUND')
delivery_stream_lou = os.environ['DSTREAMSIGNUP']
else:
print('<< ERROR - NO VALID EVENT NAME FOUND !! >>')
if delivery_stream_lou !='' and delivery_stream_lou is not None and delivery_stream_lou != 'null':
jsonData = json.dumps(event) + '\n'
print('<< JSON DATA >> ',jsonData)
response = client.put_record(DeliveryStreamName=delivery_stream_lou, Record={'Data': jsonData})
print(response)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"success"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed 2"})
}
Note: The code is almost identical for this Punchh lambda as the Iterable Lambda except for the different os.environment variables that identify the different buck prefixes.
感谢您的帮助。
根据错误消息 python 代码正在将 data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder
值传递给 PutRecord
请求中的 deliveryStreamName
参数。我认为那不是您的交付流的名称。检查正在填充 deliveryStreamName
参数
的环境变量的值
我正在尝试通过运动传输流将 Punchh webhook JSON 数据流式传输到多个 S3 存储桶,但它给了我这个错误。但是,当我流式传输 Iterable webhook 数据时,Iterable S3 存储桶已成功填充来自 Iterable wehbook 发送的可交付流的数据。
Here is the error:
An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value 'data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder' at 'deliveryStreamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+
注意: S3 prefix value -in bold- 上面说的错误和我测试成功的Iterable prefixes类似.
这是导致错误的流之一的 Punchh S3 前缀:
data/landing/vendor/punchh/punchh_to_dd/ex-points-reminder/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/dynamic
这是 Punch S3 流的目的地(注意 X 用作占位符以隐藏无关的真实地址):
arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/lou-punchh-ex-points-reminder-events
这是代理(后端)Lambda 的 Python 代码(Punchh 的类似代码作为 Iterable)。我有几个动态 S3 存储桶与它们各自的传输流配对:
import boto3
import json
import base64
import os
client = boto3.client('firehose', region_name= 'us-west-2')
delivery_stream_lou = {}
def lambda_handler(event, context):
try:
print(event)
print('<< EVENT BODY >>',event['event_name'])
try:
print('<< EVENT NAME >> ',event['event_name'])
eventName = event['event_name']
eventType = event['event_type']
if 'users' == eventName.lower():
#ADD firehose KINESIS for USERS
print('USERS FOUND')
delivery_stream_lou = os.environ['DSTREAMUSERS']
elif 'loyalty' in eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('LOYALTY FOUND')
delivery_stream_lou = os.environ['DSTREAMLOYALTY']
elif 'gift' == eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('GIFT FOUND')
delivery_stream_lou = os.environ['DSTREAMGIFT']
elif 'redemptions' == eventName.lower():
#ADD firehose KINESIS for REDEMPTIONS
print('REDEMPTIONS FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEMPTIONS']
elif 'rewards' == eventName.lower():
#ADD firehose KINESIS for REWARDS
print('REWARDS FOUND')
delivery_stream_lou = os.environ['DSTREAMREWARDS']
elif 'redeemables' == eventName.lower():
#ADD firehose KINESIS for REDEEMABLES
print('REDEEMABLES FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEEMABLES']
elif 'points_expiry_reminder' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT REMINDER FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTSTREMINDER']
print('<< DELIVERY STREAM LOU >> ', delivery_stream_lou)
elif 'points_expiry' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTS']
elif 'signup_campaign' == eventType.lower():
#ADD firehose KINESIS for SIGNUP CAMPAIGN
print('SIGNUP FOUND')
delivery_stream_lou = os.environ['DSTREAMSIGNUP']
else:
print('<< ERROR - NO VALID EVENT NAME FOUND !! >>')
if delivery_stream_lou !='' and delivery_stream_lou is not None and delivery_stream_lou != 'null':
jsonData = json.dumps(event) + '\n'
print('<< JSON DATA >> ',jsonData)
response = client.put_record(DeliveryStreamName=delivery_stream_lou, Record={'Data': jsonData})
print(response)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"success"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed 2"})
}
Note: The code is almost identical for this Punchh lambda as the Iterable Lambda except for the different os.environment variables that identify the different buck prefixes.
感谢您的帮助。
根据错误消息 python 代码正在将 data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder
值传递给 PutRecord
请求中的 deliveryStreamName
参数。我认为那不是您的交付流的名称。检查正在填充 deliveryStreamName
参数