多个并行 AWS Lambda 调用
Multiple parallel AWS Lambda invocations
我正在尝试使用 python 3.7.2 和 aiobotocore 包执行多个 AWS Lambda 调用。这是我的代码。
import asyncio
import aiobotocore
async def invoke(payload, session):
async with session.create_client('lambda', region_name='us-east-1') as client:
return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
def generate_invocations(payloads, session):
for payload in payloads:
yield invoke(payload, session)
def invoke_all(payloads):
loop = asyncio.get_event_loop()
async def wrapped():
session = aiobotocore.get_session(loop=loop)
invocations = generate_invocations(payloads, session)
return await asyncio.gather(*invocations)
return loop.run_until_complete(wrapped())
def main():
payloads_list = [] # MY PAYLOADS LIST
lambda_responses = invoke_all(payloads_list)
print(lambda_responses)
if __name__ == '__main__':
main()
代码运行得非常快(10 个有效载荷大约 1 秒,而不是使用 boto3 lambda 客户端调用的 15 个有效载荷)但我有两个问题:
1) lambda_responses中的元素包括'Payload'键,其值是aiobotocore.response.StreamingBody类型。当我尝试 value.read() 时,我收到 "coroutine object StreamingBody.read" 并且我认为我的代码中存在一些问题。我可以通过 "json.loads(json.loads(r['Payload']._buffer.pop())['body'])" 收到所需的响应,但获得它的正确方法是什么。
2) 在极少数情况下,其中一个响应中的 "Payload" 缓冲区为空。如何确保 invoke_all 函数 returns 非空响应? aiobotocore 的用法正确吗?
我是 python 3 和异步功能的新手。受 aiobotocore documentation and Mathew Marcus blog.
示例的启发
谢谢!
- The elements in lambda_responses include 'Payload' key which value is of type aiobotocore.response.StreamingBody. When I try value.read() I receive "coroutine object StreamingBody.read"
这意味着 read()
协程需要等待,您应该在事件循环中执行此操作。例如,您可以更改 invoke
协程以同时读取响应:
async def invoke(payload, session):
async with session.create_client('lambda', region_name='us-east-1') as client:
resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
payload = await resp['Payload'].read()
return payload # or assemble a dict with relevant parts
- In rare cases the "Payload" in one of responses has empty buffer.
这可能是因为您在实际读取内容之前访问了缓冲区。在某些情况下,信息到达的速度足够快,您无论如何都可以在内部缓冲区中找到它,但有时您必须等待它。使用 public 方法,例如 read()
可确保您正确使用 API。另一方面,_buffer
属性 以下划线开头,表示它是一个实现细节,不能直接访问。
我正在尝试使用 python 3.7.2 和 aiobotocore 包执行多个 AWS Lambda 调用。这是我的代码。
import asyncio
import aiobotocore
async def invoke(payload, session):
async with session.create_client('lambda', region_name='us-east-1') as client:
return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
def generate_invocations(payloads, session):
for payload in payloads:
yield invoke(payload, session)
def invoke_all(payloads):
loop = asyncio.get_event_loop()
async def wrapped():
session = aiobotocore.get_session(loop=loop)
invocations = generate_invocations(payloads, session)
return await asyncio.gather(*invocations)
return loop.run_until_complete(wrapped())
def main():
payloads_list = [] # MY PAYLOADS LIST
lambda_responses = invoke_all(payloads_list)
print(lambda_responses)
if __name__ == '__main__':
main()
代码运行得非常快(10 个有效载荷大约 1 秒,而不是使用 boto3 lambda 客户端调用的 15 个有效载荷)但我有两个问题:
1) lambda_responses中的元素包括'Payload'键,其值是aiobotocore.response.StreamingBody类型。当我尝试 value.read() 时,我收到 "coroutine object StreamingBody.read" 并且我认为我的代码中存在一些问题。我可以通过 "json.loads(json.loads(r['Payload']._buffer.pop())['body'])" 收到所需的响应,但获得它的正确方法是什么。
2) 在极少数情况下,其中一个响应中的 "Payload" 缓冲区为空。如何确保 invoke_all 函数 returns 非空响应? aiobotocore 的用法正确吗?
我是 python 3 和异步功能的新手。受 aiobotocore documentation and Mathew Marcus blog.
示例的启发谢谢!
- The elements in lambda_responses include 'Payload' key which value is of type aiobotocore.response.StreamingBody. When I try value.read() I receive "coroutine object StreamingBody.read"
这意味着 read()
协程需要等待,您应该在事件循环中执行此操作。例如,您可以更改 invoke
协程以同时读取响应:
async def invoke(payload, session):
async with session.create_client('lambda', region_name='us-east-1') as client:
resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
payload = await resp['Payload'].read()
return payload # or assemble a dict with relevant parts
- In rare cases the "Payload" in one of responses has empty buffer.
这可能是因为您在实际读取内容之前访问了缓冲区。在某些情况下,信息到达的速度足够快,您无论如何都可以在内部缓冲区中找到它,但有时您必须等待它。使用 public 方法,例如 read()
可确保您正确使用 API。另一方面,_buffer
属性 以下划线开头,表示它是一个实现细节,不能直接访问。