如何使用控制台在 aws SNS 中发送 json 消息正文
How to send json message body in aws SNS using console
我正在尝试将 SNS 触发器添加到 lambda 函数,然后将消息发送到松弛通道。 python 中有此 lambda 的蓝图以及如下所示的模板测试事件
{
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:EXAMPLE",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "1970-01-01T00:00:00.000Z",
"Signature": "EXAMPLE",
"SigningCertUrl": "EXAMPLE",
"MessageId": "12345",
"Message": {
"AlarmName": "SlackAlarm",
"NewStateValue": "OK",
"NewStateReason": "Threshold Crossed: 1 datapoint (0.0) was not greater than or equal to the threshold (1.0)."
},
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
},
"Type": "Notification",
"UnsubscribeUrl": "EXAMPLE",
"TopicArn": "arn:aws:sns:EXAMPLE",
"Subject": "TestInvoke"
}
}
]
蓝图中的lambda handler代码如下
import boto3
import json
import logging
import os
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
HOOK_URL = os.environ['kmsEncryptedHookUrl']
SLACK_CHANNEL = os.environ['slackChannel']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['Records'][0]['Sns']['Message']
logger.info("Message: " + str(message))
alarm_name = message['AlarmName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
slack_message = {
'channel': SLACK_CHANNEL,
'text': "%s state is now %s: %s" % (alarm_name, new_state, reason)
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", slack_message['channel'])
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
当我 运行 测试事件时,lambda 运行 成功。
我想从控制台在 SNS 主题中发布一条消息,以查看是否正确触发了 lambda。但是当我尝试将 JSON 对象发布为消息正文时,出现错误
[ERROR] TypeError: string indices must be integersTraceback (most recent call last): File "/var/task/lambda_function.py", line 21, in lambda_handler alarm_name = message['AlarmName']
我试着给出简单的 json
{
"AlarmName": "PublishedAlarm",
"NewStateValue": "OK",
"NewStateReason": "This alarm is published"
}
我试着给出一个字符串化的 JSON
"{\"AlarmName\": \"PublishedAlarm\",\"NewStateValue\": \"OK\",\"NewStateReason\": \"This alarm is published\"}"
我尝试选择 Custom payload for each delivery
消息结构,然后给出了以下消息正文
{
"default": "Sample fallback message",
"email": "Sample message for email endpoints",
"sqs": "Sample message for Amazon SQS endpoints",
"lambda": "{\"AlarmName\": \"PublishedAlarm\",\"NewStateValue\": \"OK\",\"NewStateReason\": \"This alarm is published\"}",
"http": "Sample message for HTTP endpoints",
"https": "Sample message for HTTPS endpoints",
"sms": "Sample message for SMS endpoints",
"firehose": "Sample message for Amazon Kinesis Data Firehose endpoints",
"APNS": "{\"aps\":{\"alert\": \"Sample message for iOS endpoints\"} }",
"APNS_SANDBOX": "{\"aps\":{\"alert\":\"Sample message for iOS development endpoints\"}}",
"APNS_VOIP": "{\"aps\":{\"alert\":\"Sample message for Apple VoIP endpoints\"}}",
"APNS_VOIP_SANDBOX": "{\"aps\":{\"alert\": \"Sample message for Apple VoIP development endpoints\"} }",
"MACOS": "{\"aps\":{\"alert\":\"Sample message for MacOS endpoints\"}}",
"MACOS_SANDBOX": "{\"aps\":{\"alert\": \"Sample message for MacOS development endpoints\"} }",
"GCM": "{ \"data\": { \"message\": \"Sample message for Android endpoints\" } }",
"ADM": "{ \"data\": { \"message\": \"Sample message for FireOS endpoints\" } }",
"BAIDU": "{\"title\":\"Sample message title\",\"description\":\"Sample message for Baidu endpoints\"}",
"MPNS": "<?xml version=\"1.0\" encoding=\"utf-8\"?><wp:Notification xmlns:wp=\"WPNotification\"><wp:Tile><wp:Count>ENTER COUNT</wp:Count><wp:Title>Sample message for Windows Phone 7+ endpoints</wp:Title></wp:Tile></wp:Notification>",
"WNS": "<badge version=\"1\" value=\"42\"/>"
}
没有任何效果。我还订阅了该主题的电子邮件地址,我收到的电子邮件没有任何问题。
如何从 SNS 模拟 lambda 事件模板中给出的测试事件?
当您使用 SNS 发送纯 json 消息时,它将以以下格式传送到 lambda:
'Message': '{\n "AlarmName": "PublishedAlarm",\n "NewStateValue": "OK",\n "NewStateReason": "This alarm is published"\n}'
您可以使用 ast' literal_eval
方法解析它:
import ast
#...
#...
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['Records'][0]['Sns']['Message']
logger.info("Message: " + str(message))
message = ast.literal_eval(event['Records'][0]['Sns']['Message'])
alarm_name = message['AlarmName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
#...
#...
我正在尝试将 SNS 触发器添加到 lambda 函数,然后将消息发送到松弛通道。 python 中有此 lambda 的蓝图以及如下所示的模板测试事件
{
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:EXAMPLE",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "1970-01-01T00:00:00.000Z",
"Signature": "EXAMPLE",
"SigningCertUrl": "EXAMPLE",
"MessageId": "12345",
"Message": {
"AlarmName": "SlackAlarm",
"NewStateValue": "OK",
"NewStateReason": "Threshold Crossed: 1 datapoint (0.0) was not greater than or equal to the threshold (1.0)."
},
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
},
"Type": "Notification",
"UnsubscribeUrl": "EXAMPLE",
"TopicArn": "arn:aws:sns:EXAMPLE",
"Subject": "TestInvoke"
}
}
]
蓝图中的lambda handler代码如下
import boto3
import json
import logging
import os
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
HOOK_URL = os.environ['kmsEncryptedHookUrl']
SLACK_CHANNEL = os.environ['slackChannel']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['Records'][0]['Sns']['Message']
logger.info("Message: " + str(message))
alarm_name = message['AlarmName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
slack_message = {
'channel': SLACK_CHANNEL,
'text': "%s state is now %s: %s" % (alarm_name, new_state, reason)
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", slack_message['channel'])
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
当我 运行 测试事件时,lambda 运行 成功。
我想从控制台在 SNS 主题中发布一条消息,以查看是否正确触发了 lambda。但是当我尝试将 JSON 对象发布为消息正文时,出现错误
[ERROR] TypeError: string indices must be integersTraceback (most recent call last): File "/var/task/lambda_function.py", line 21, in lambda_handler alarm_name = message['AlarmName']
我试着给出简单的 json
{
"AlarmName": "PublishedAlarm",
"NewStateValue": "OK",
"NewStateReason": "This alarm is published"
}
我试着给出一个字符串化的 JSON
"{\"AlarmName\": \"PublishedAlarm\",\"NewStateValue\": \"OK\",\"NewStateReason\": \"This alarm is published\"}"
我尝试选择 Custom payload for each delivery
消息结构,然后给出了以下消息正文
{
"default": "Sample fallback message",
"email": "Sample message for email endpoints",
"sqs": "Sample message for Amazon SQS endpoints",
"lambda": "{\"AlarmName\": \"PublishedAlarm\",\"NewStateValue\": \"OK\",\"NewStateReason\": \"This alarm is published\"}",
"http": "Sample message for HTTP endpoints",
"https": "Sample message for HTTPS endpoints",
"sms": "Sample message for SMS endpoints",
"firehose": "Sample message for Amazon Kinesis Data Firehose endpoints",
"APNS": "{\"aps\":{\"alert\": \"Sample message for iOS endpoints\"} }",
"APNS_SANDBOX": "{\"aps\":{\"alert\":\"Sample message for iOS development endpoints\"}}",
"APNS_VOIP": "{\"aps\":{\"alert\":\"Sample message for Apple VoIP endpoints\"}}",
"APNS_VOIP_SANDBOX": "{\"aps\":{\"alert\": \"Sample message for Apple VoIP development endpoints\"} }",
"MACOS": "{\"aps\":{\"alert\":\"Sample message for MacOS endpoints\"}}",
"MACOS_SANDBOX": "{\"aps\":{\"alert\": \"Sample message for MacOS development endpoints\"} }",
"GCM": "{ \"data\": { \"message\": \"Sample message for Android endpoints\" } }",
"ADM": "{ \"data\": { \"message\": \"Sample message for FireOS endpoints\" } }",
"BAIDU": "{\"title\":\"Sample message title\",\"description\":\"Sample message for Baidu endpoints\"}",
"MPNS": "<?xml version=\"1.0\" encoding=\"utf-8\"?><wp:Notification xmlns:wp=\"WPNotification\"><wp:Tile><wp:Count>ENTER COUNT</wp:Count><wp:Title>Sample message for Windows Phone 7+ endpoints</wp:Title></wp:Tile></wp:Notification>",
"WNS": "<badge version=\"1\" value=\"42\"/>"
}
没有任何效果。我还订阅了该主题的电子邮件地址,我收到的电子邮件没有任何问题。
如何从 SNS 模拟 lambda 事件模板中给出的测试事件?
当您使用 SNS 发送纯 json 消息时,它将以以下格式传送到 lambda:
'Message': '{\n "AlarmName": "PublishedAlarm",\n "NewStateValue": "OK",\n "NewStateReason": "This alarm is published"\n}'
您可以使用 ast' literal_eval
方法解析它:
import ast
#...
#...
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['Records'][0]['Sns']['Message']
logger.info("Message: " + str(message))
message = ast.literal_eval(event['Records'][0]['Sns']['Message'])
alarm_name = message['AlarmName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
#...
#...