API 使用 CDK 到 Lambda 的网关有问题
Trouble with API Gateway to Lambda using CDK
我正在使用 Python CDK 部署由 API 调用触发的 Lambda 函数。这是我的代码:
app.py
#!/usr/bin/env python3
import os
from aws_cdk import core
from cvsg_app_bff.usermanagement import UserManagementStack
app = core.App()
UserManagementStack(app, "UserManagementStack")
app.synth()
我的堆栈:
class UserManagementStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Lambda functions
create_new_user: _lambda.Function = _lambda.Function(self, "newUserLambdaHandler",
runtime=_lambda.Runtime.PYTHON_3_8,
handler="usermanagement.create_new_user",
code=_lambda.Code.from_asset("lambda_functions")
)
# API gateway setup
user_apis = api_gw.LambdaRestApi(self, 'UserManagementAPIs', handler=create_new_user, proxy=False)
user_apis.root.resource_for_path('user').add_method('POST', api_gw.LambdaIntegration(create_new_user))
包含 Lambda 函数的文件:
def format_return(status_code, response_body):
response = {
'statusCode': status_code,
'body': response_body
}
print("Response: ", str(response))
return response
def is_admin(event):
# todo: not implemented yet!
return True
def get_event_body(event):
print(json.dumps(event))
return json.loads(event["body"])
def create_new_user(event, context):
request_body = get_event_body(event)
if is_admin(event):
return format_return(200, request_body)
else:
return format_return(403, "Not Unauthorised for this operation")
当我部署并命中端点时,我收到带有此消息的 502 Bad Gateway 响应
{
"message": "Internal server error"
}
我查看了 CloudWatch 日志,我可以看到在 format_response
方法中打印了正确的响应,但是客户端访问端点的响应始终是 502。
如何让 API 网关获得 return 正确的响应?
您的问题似乎是您的 Lambda 函数 return 响应不正确。 API 网关通过 Lambda 代理集成调用的 Lambda 函数需要 return 以下格式的 JSON 对象:
{
"isBase64Encoded": True|False,
"statusCode": <status code>,
"headers": {
"header-name": "header-value",
# ...
},
"body": "{\"your\":\"data\"}" # <--- MUST be a string, NOT an object
}
您的 create_new_user
函数 returns format_return(200, request_body)
,其中 request_body
是 get_event_body
的结果,其中 returns json.loads
(其中 return 是 dict,不是字符串)。
像这样更改 get_event_body
应该可以解决问题:
def get_event_body(event):
print(json.dumps(event))
return json.dumps(event["body"])
我正在使用 Python CDK 部署由 API 调用触发的 Lambda 函数。这是我的代码:
app.py
#!/usr/bin/env python3
import os
from aws_cdk import core
from cvsg_app_bff.usermanagement import UserManagementStack
app = core.App()
UserManagementStack(app, "UserManagementStack")
app.synth()
我的堆栈:
class UserManagementStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Lambda functions
create_new_user: _lambda.Function = _lambda.Function(self, "newUserLambdaHandler",
runtime=_lambda.Runtime.PYTHON_3_8,
handler="usermanagement.create_new_user",
code=_lambda.Code.from_asset("lambda_functions")
)
# API gateway setup
user_apis = api_gw.LambdaRestApi(self, 'UserManagementAPIs', handler=create_new_user, proxy=False)
user_apis.root.resource_for_path('user').add_method('POST', api_gw.LambdaIntegration(create_new_user))
包含 Lambda 函数的文件:
def format_return(status_code, response_body):
response = {
'statusCode': status_code,
'body': response_body
}
print("Response: ", str(response))
return response
def is_admin(event):
# todo: not implemented yet!
return True
def get_event_body(event):
print(json.dumps(event))
return json.loads(event["body"])
def create_new_user(event, context):
request_body = get_event_body(event)
if is_admin(event):
return format_return(200, request_body)
else:
return format_return(403, "Not Unauthorised for this operation")
当我部署并命中端点时,我收到带有此消息的 502 Bad Gateway 响应
{
"message": "Internal server error"
}
我查看了 CloudWatch 日志,我可以看到在 format_response
方法中打印了正确的响应,但是客户端访问端点的响应始终是 502。
如何让 API 网关获得 return 正确的响应?
您的问题似乎是您的 Lambda 函数 return 响应不正确。 API 网关通过 Lambda 代理集成调用的 Lambda 函数需要 return 以下格式的 JSON 对象:
{
"isBase64Encoded": True|False,
"statusCode": <status code>,
"headers": {
"header-name": "header-value",
# ...
},
"body": "{\"your\":\"data\"}" # <--- MUST be a string, NOT an object
}
您的 create_new_user
函数 returns format_return(200, request_body)
,其中 request_body
是 get_event_body
的结果,其中 returns json.loads
(其中 return 是 dict,不是字符串)。
像这样更改 get_event_body
应该可以解决问题:
def get_event_body(event):
print(json.dumps(event))
return json.dumps(event["body"])