异步 io 并在一个任务中调用 Lambda,在另一个任务中调用步骤函数,return 第一个完成

Async io and calling a Lambda in one task and a step function in the other, return the first that completes

所以,我们正在构建遗留系统的替代品。此替换是状态机(步进函数)中的一系列 lambda。

在推出期间,我们会复制传入的请求,将一份副本发送到我们的 Step Function,并将另一端发送到遗留系统。如果新系统没有失败,那么它会先return并取消另一个线程。

我们这样做是为了如果我们的新系统出现故障,它仍然会从旧系统中得到响应。鉴于我们的系统比旧系统快 3 到 4 倍,我们一开始就依靠它来防止旧系统 return 在我们的系统之前运行(是的,我们很清楚并且已经设计的潜在竞争条件为了)。我们使用 asyncio 和 aiohttp 以及 async.wait(first_completed)

来实现这一点

基本上(有点伪代码)

async def new_system()
    boto3.start_sync_execution(Express State Machine)

async def legacy_system()
    await aiohttp.post(request)

async def wait_first()
    done, pending = async.wait(new_system_task, legacy_system_task, First_completed)
    
    for task in done
        check if it was an error
              if error, wait for pending
        else
              cancel pending
    

现在,我们的业务需求略有变化 (hah),要求他们能够将我们系统的输出与之前系统的输出进行比较。我们无法直接 link 两个系统数据库并关联它们之间的请求,因此我们认为我们会将请求提取到 lambda 中。

通过上面的伪代码,您可以看到遗留系统不会对 parse/store/attribute 对我们的相关 ID 做出响应。

但是如果我们将它拉出到一个 lambda 中,如果我们的 returned 成功,我们可以停止监听 lambda 响应。然而,lambda 仍将继续 运行 允许我们获取响应、解析和存储它。

但是,如果我们将 aiohttp.post 替换为 boto3.function 调用,那么我们 运行 就会陷入经典的异步问题 - 状态机执行或 boto3 调用都不会与异步相关联循环,因此将 return 'simultaneously' 等待。

我有 4 种可能的解决方案,但我不确定哪个是最好的我想就我可能不知道或错误的每种方案的优缺点提出建议

  1. 不要取消挂起的任务。让挂起的任务完成 - 这样我们就可以将 parse/store 放入 legacy_system( ) 功能没有太多麻烦

这里的恐惧是我们必须尽快 return 到 API。因此,如果我们 return 使用我们的新系统,而遗留系统仍然 'running' 在调用中再持续 3-4 秒,这会严重破坏 lambda 的并发执行吗?

  1. 使用 async 中的 executor_functions 将 new_system() 和 legacy_system() 绑定到循环中。我不确定如何直接执行此操作,但据我了解代码,这应该可以在没有任何不利副作用的情况下工作,并允许我们完成我们想要的

  2. 使用 boto3 函数中的 invoke(type=event) 进行异步调用,并添加一个等待响应池的函数 This 似乎 没问题,但我不确定它在实践中是否真的有效,我认为它过于复杂 - 轮询完成的 lambda 执行感觉对我们的事件驱动服务的其余部分不利过于复杂

  3. 在函数中添加一个 await asyncio.sleep() 我不确定我们是否可以在 await [=75] 上做不到 1 秒=]?如果可以的话,这可能是问题最少的最简单的答案,但似乎 真的 friggin hacky。

今晚我尝试了选项 2,但还没有弄清楚如何让它发挥作用。

提前致谢

虽然我尝试在不导入大量库的情况下做事,但在这种情况下没有太多选择。 Boto3 固有地调用 Block 异步循环。然而 aioboto3 库处理这个。

import aioboto3

session = aioboto3.Session()

async with session.client('lambda') as fn_client:
    response = await fn_client.invoke(...