我可以确定哪个 AWS 服务触发了我的 Lambda 函数吗?
Can I determine which AWS service triggered my Lambda function?
我有一个 Python Lambda 函数,可以响应 IoT Button 和 Alexa 技能。
有没有办法读取传递给处理函数的 event
或 context
以确定哪个服务触发了该函数(Alexa 或 IoT)?
您可以像下面这样在 Cloudwatch 中阅读和记录事件
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def my_logging_handler(event, context):
logger.info('got event{}'.format(event))
这将在云监视中记录您看到的事件数据,并确定哪个事件触发了 lambda
无法可靠地完成此操作。您最接近的是熟悉不同服务生成的各种事件的内容,并且(希望)识别您感兴趣的每个系列中存在的可靠唯一密钥,然后您可以在代码中检查,例如
if 'distinctKey' in event.keys():
# ...
然而,这并不是一个可靠的方法,因为它要求您
- 检查每个可能的每个潜在服务生成的事件结构和
- 成功并自信地为每个感兴趣的服务识别一个密钥或一组密钥,这些密钥始终可靠地存在在服务的事件中并且唯一 给他们。
希望 AWS 现在已经对此进行了简化,但很遗憾,事实并非如此。没有任何一个参数可以让您通过检查来确定所有 AWS 服务中的事件类型。
然而,在网上找到了这个很好的发音here
function getLambdaEventSource(event) {
if (event.Records && event.Records[0].cf) return 'isCloudfront';
if (event.configRuleId && event.configRuleName && event.configRuleArn) return 'isAwsConfig';
if (event.Records && (event.Records[0].eventSource === 'aws:codecommit')) return 'isCodeCommit';
if (event.authorizationToken === "incoming-client-token") return 'isApiGatewayAuthorizer';
if (event.StackId && event.RequestType && event.ResourceType) return 'isCloudFormation';
if (event.Records && (event.Records[0].eventSource === 'aws:ses')) return 'isSes';
if (event.pathParameters && event.pathParameters.proxy) return 'isApiGatewayAwsProxy';
if (event.source === 'aws.events') return 'isScheduledEvent';
if (event.awslogs && event.awslogs.data) return 'isCloudWatchLogs';
if (event.Records && (event.Records[0].EventSource === 'aws:sns')) return 'isSns';
if (event.Records && (event.Records[0].eventSource === 'aws:dynamodb')) return 'isDynamoDb';
if (event.records && event.records[0].approximateArrivalTimestamp) return 'isKinesisFirehose';
if (event.records && event.deliveryStreamArn && event.deliveryStreamArn.startsWith('arn:aws:kinesis:')) return 'isKinesisFirehose';
if (event.eventType === 'SyncTrigger' && event.identityId && event.identityPoolId) return 'isCognitoSyncTrigger';
if (event.Records && event.Records[0].eventSource === 'aws:kinesis') return 'isKinesis';
if (event.Records && event.Records[0].eventSource === 'aws:s3') return 'isS3';
if (event.operation && event.message) return 'isMobileBackend';
}
对于 Python 开发人员:您可以检查一下 https://gist.github.com/Necromancerx/abed07138690d37d170a6cf15b40d749。
def get_lambda_event_source(self, event: dict):
if 'pathParameters' in event and 'proxy' in event['pathParameters']:
return 'api_gateway_aws_proxy'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:s3':
return 's3'
elif 'Records' in event and len(event['Records']) > 0 and 'EventSource' in event['Records'][0] and event['Records'][0]['EventSource'] == 'aws:sns':
return 'sns'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:dynamodb':
return 'dynamo_db'
elif 'Records' in event and len(event['Records']) > 0 and 'cf' in event['Records'][0]:
return 'cloudfront'
elif 'source' in event and event['source'] == 'aws.events':
return 'scheduled_event'
elif 'awslogs' in event and 'data' in event['awslogs']:
return 'cloud_watch_logs'
elif 'authorizationToken' in event and event['authorizationToken'] == "incoming-client-token":
return 'api_gateway_authorizer'
elif 'configRuleId' in event and 'configRuleName' in event and 'configRuleArn' in event:
return 'aws_config'
elif 'StackId' in event and 'RequestType' in event and 'ResourceType' in event:
return 'cloud_formation'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:codecommit':
return 'code_commit'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:ses':
return 'ses'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:kinesis':
return 'kinesis'
elif 'records' in event and len(event['Records']) > 0 and 'approximateArrivalTimestamp' in event['records'][0]:
return 'kinesis_firehose'
elif 'records' in event and len(event['Records']) > 0 and 'deliveryStreamArn' in event and event['deliveryStreamArn'] is str and event['deliveryStreamArn'].startswith('arn:aws:kinesis:'):
return 'kinesis_firehose'
elif 'eventType' in event and event['eventType'] == 'SyncTrigger' and 'identityId' in event and 'identityPoolId' in event:
return 'cognito_sync_trigger'
elif 'operation' in event and 'message' in event:
return 'is_mobile_backend'
基于此 javascript 要点 https://gist.github.com/jeshan/52cb021fd20d871c56ad5ce6d2654d7b
我有一个 Python Lambda 函数,可以响应 IoT Button 和 Alexa 技能。
有没有办法读取传递给处理函数的 event
或 context
以确定哪个服务触发了该函数(Alexa 或 IoT)?
您可以像下面这样在 Cloudwatch 中阅读和记录事件
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def my_logging_handler(event, context):
logger.info('got event{}'.format(event))
这将在云监视中记录您看到的事件数据,并确定哪个事件触发了 lambda
无法可靠地完成此操作。您最接近的是熟悉不同服务生成的各种事件的内容,并且(希望)识别您感兴趣的每个系列中存在的可靠唯一密钥,然后您可以在代码中检查,例如
if 'distinctKey' in event.keys():
# ...
然而,这并不是一个可靠的方法,因为它要求您
- 检查每个可能的每个潜在服务生成的事件结构和
- 成功并自信地为每个感兴趣的服务识别一个密钥或一组密钥,这些密钥始终可靠地存在在服务的事件中并且唯一 给他们。
希望 AWS 现在已经对此进行了简化,但很遗憾,事实并非如此。没有任何一个参数可以让您通过检查来确定所有 AWS 服务中的事件类型。
然而,在网上找到了这个很好的发音here
function getLambdaEventSource(event) {
if (event.Records && event.Records[0].cf) return 'isCloudfront';
if (event.configRuleId && event.configRuleName && event.configRuleArn) return 'isAwsConfig';
if (event.Records && (event.Records[0].eventSource === 'aws:codecommit')) return 'isCodeCommit';
if (event.authorizationToken === "incoming-client-token") return 'isApiGatewayAuthorizer';
if (event.StackId && event.RequestType && event.ResourceType) return 'isCloudFormation';
if (event.Records && (event.Records[0].eventSource === 'aws:ses')) return 'isSes';
if (event.pathParameters && event.pathParameters.proxy) return 'isApiGatewayAwsProxy';
if (event.source === 'aws.events') return 'isScheduledEvent';
if (event.awslogs && event.awslogs.data) return 'isCloudWatchLogs';
if (event.Records && (event.Records[0].EventSource === 'aws:sns')) return 'isSns';
if (event.Records && (event.Records[0].eventSource === 'aws:dynamodb')) return 'isDynamoDb';
if (event.records && event.records[0].approximateArrivalTimestamp) return 'isKinesisFirehose';
if (event.records && event.deliveryStreamArn && event.deliveryStreamArn.startsWith('arn:aws:kinesis:')) return 'isKinesisFirehose';
if (event.eventType === 'SyncTrigger' && event.identityId && event.identityPoolId) return 'isCognitoSyncTrigger';
if (event.Records && event.Records[0].eventSource === 'aws:kinesis') return 'isKinesis';
if (event.Records && event.Records[0].eventSource === 'aws:s3') return 'isS3';
if (event.operation && event.message) return 'isMobileBackend';
}
对于 Python 开发人员:您可以检查一下 https://gist.github.com/Necromancerx/abed07138690d37d170a6cf15b40d749。
def get_lambda_event_source(self, event: dict):
if 'pathParameters' in event and 'proxy' in event['pathParameters']:
return 'api_gateway_aws_proxy'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:s3':
return 's3'
elif 'Records' in event and len(event['Records']) > 0 and 'EventSource' in event['Records'][0] and event['Records'][0]['EventSource'] == 'aws:sns':
return 'sns'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:dynamodb':
return 'dynamo_db'
elif 'Records' in event and len(event['Records']) > 0 and 'cf' in event['Records'][0]:
return 'cloudfront'
elif 'source' in event and event['source'] == 'aws.events':
return 'scheduled_event'
elif 'awslogs' in event and 'data' in event['awslogs']:
return 'cloud_watch_logs'
elif 'authorizationToken' in event and event['authorizationToken'] == "incoming-client-token":
return 'api_gateway_authorizer'
elif 'configRuleId' in event and 'configRuleName' in event and 'configRuleArn' in event:
return 'aws_config'
elif 'StackId' in event and 'RequestType' in event and 'ResourceType' in event:
return 'cloud_formation'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:codecommit':
return 'code_commit'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:ses':
return 'ses'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:kinesis':
return 'kinesis'
elif 'records' in event and len(event['Records']) > 0 and 'approximateArrivalTimestamp' in event['records'][0]:
return 'kinesis_firehose'
elif 'records' in event and len(event['Records']) > 0 and 'deliveryStreamArn' in event and event['deliveryStreamArn'] is str and event['deliveryStreamArn'].startswith('arn:aws:kinesis:'):
return 'kinesis_firehose'
elif 'eventType' in event and event['eventType'] == 'SyncTrigger' and 'identityId' in event and 'identityPoolId' in event:
return 'cognito_sync_trigger'
elif 'operation' in event and 'message' in event:
return 'is_mobile_backend'
基于此 javascript 要点 https://gist.github.com/jeshan/52cb021fd20d871c56ad5ce6d2654d7b