如何在不再次通过 API 网关和授权方的情况下直接从另一个 Lambda 调用 Chalice Lambda?
How to directly invoke a Chalice Lambda from another Lambda without going through the API Gateway and Authorizer again?
我有 2 个 Lambda 使用 Chalice 在 API 网关后面。所有端点均受 Cognito 授权方保护。
当我调用 GET /hello
时,我希望第一个 Lambda 从第二个 Lambda 获取数据:
# Lambda1
@app.route('/hello')
def say_hello():
name = # fetch data from Lambda2
return Response({'message': 'hello ' + name})
# Lambda2
@app.route('/name')
def a_name():
return Response({'name': 'GLaDOS'})
此请求不必再次通过 API 网关 + 授权方,因为它会产生不必要的开销。
如何直接调用?
编辑:仅当您出于任何原因无法获得 Internal API Gateway 时,我才推荐以下解决方案。
使用下面的代码,我可以直接成功调用另一个 Chalice Lambda(无需再次通过 API 网关 + 授权方):
def invoke_sync(lambda_name: str,
http_method: str,
path: str,
claims: dict,
path_parameters: dict = None,
http_request_body: str = None,
query_string_parameters: dict = None,
headers: dict = None):
invoke_payload = {
'path': path,
'httpMethod': http_method,
'headers': headers,
'multiValueHeaders': {},
'queryStringParameters': query_string_parameters,
'multiValueQueryStringParameters': None,
'pathParameters': path_parameters,
'stageVariables': None,
'requestContext': {
'authorizer': {'claims': claims},
'path': path,
'resourcePath': path,
'httpMethod': http_method,
},
'body': http_request_body,
}
lambda_response = boto3.client('lambda').invoke(FunctionName=lambda_name,
InvocationType='RequestResponse',
Payload=json.dumps(invoke_payload))
payload = json.loads(lambda_response['Payload'].read())
status_code = payload['statusCode']
...
用法示例:
invoke_sync(
lambda_name='Users',
claims=<claims-in-the-id-token>,
http_method='GET',
path='/users/{userId}',
path_parameters={'userId': 123}
)
这个调用是同步的。要创建上述的异步版本,请使用 InvocationType='Event'
as documented here.
请注意,如果使用其他语言或框架,上面使用的负载将完全相同,因为这是从 API 网关发送到 Lambda 函数的格式。
我有 2 个 Lambda 使用 Chalice 在 API 网关后面。所有端点均受 Cognito 授权方保护。
当我调用 GET /hello
时,我希望第一个 Lambda 从第二个 Lambda 获取数据:
# Lambda1
@app.route('/hello')
def say_hello():
name = # fetch data from Lambda2
return Response({'message': 'hello ' + name})
# Lambda2
@app.route('/name')
def a_name():
return Response({'name': 'GLaDOS'})
此请求不必再次通过 API 网关 + 授权方,因为它会产生不必要的开销。
如何直接调用?
编辑:仅当您出于任何原因无法获得 Internal API Gateway 时,我才推荐以下解决方案。
使用下面的代码,我可以直接成功调用另一个 Chalice Lambda(无需再次通过 API 网关 + 授权方):
def invoke_sync(lambda_name: str,
http_method: str,
path: str,
claims: dict,
path_parameters: dict = None,
http_request_body: str = None,
query_string_parameters: dict = None,
headers: dict = None):
invoke_payload = {
'path': path,
'httpMethod': http_method,
'headers': headers,
'multiValueHeaders': {},
'queryStringParameters': query_string_parameters,
'multiValueQueryStringParameters': None,
'pathParameters': path_parameters,
'stageVariables': None,
'requestContext': {
'authorizer': {'claims': claims},
'path': path,
'resourcePath': path,
'httpMethod': http_method,
},
'body': http_request_body,
}
lambda_response = boto3.client('lambda').invoke(FunctionName=lambda_name,
InvocationType='RequestResponse',
Payload=json.dumps(invoke_payload))
payload = json.loads(lambda_response['Payload'].read())
status_code = payload['statusCode']
...
用法示例:
invoke_sync(
lambda_name='Users',
claims=<claims-in-the-id-token>,
http_method='GET',
path='/users/{userId}',
path_parameters={'userId': 123}
)
这个调用是同步的。要创建上述的异步版本,请使用 InvocationType='Event'
as documented here.
请注意,如果使用其他语言或框架,上面使用的负载将完全相同,因为这是从 API 网关发送到 Lambda 函数的格式。