在 AWS Lambda 实例之间协调单例令牌

Coordinating singleton token between AWS Lambda instances

考虑一个编写为 AWS Lambda 函数的聊天机器人。它通过来自第 3 方服务的 HTTP 请求被调用,它集成到聊天服务中。现在,第 3 方 API 有点……古怪。它的主要问题是:它需要一个身份验证令牌才能与之交互,但任何时候只能存在一个令牌。它的工作方式是:

关键在于 POST /auth 请求会重置任何先前的令牌并使持有令牌的所有其他客户端无效。没有 GET /auth 或类似的方法来获取现有令牌。

这现在变成了随机生成的机器人并发实例之间的协调问题。他们都需要使用相同的令牌,但会独立生成令牌。我不太热衷于引入一些单例服务,其任务只是协调令牌,我想保留独立扩展的 Lambda 范例。我正在使用 DynamoDB 存储令牌以尝试在机器人实例之间协调它们,但仍然存在边缘情况竞争条件,最多需要重试三次才能使令牌结算。

最坏的情况是存储在 DynamoDB 中的令牌无效,同时实例化两个机器人:

理想情况下会有一个可锁定的资源,一个机器人可以锁定并生成令牌,而其他人可以等待。但是 AFAIK DynamoDB 没有那种功能。

什么是好的模式and/or AWS 服务来协调独立的并行 Lambda 实例之间的此类单例令牌?

虽然 DynamoDB 中没有实现本机锁定,但这是 2017 年 AWS 博客 post 展示了 Java SDK 的 DynamoDB 锁定库。 https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/

这段摘录揭示了该机制:

The DynamoDB Lock Client uses the DynamoDB UpdateItem API to heartbeat and extend locks each host owns. Additionally, the lock client uses client-side TTL to expire locks. The lock client uses a recent version of the AWS SDK for Java. The locking protocol employs conditional updates extensively and throughout the lifecycle of a lock (creation, renewal by heartbeat, and expiration by deletion).

有一些针对其他语言的分叉和改编,例如 Python。

就我个人而言,我会尝试解耦令牌的读写操作。应该只有一个写入令牌的“进程”和多个从存储中读取令牌的“进程”。

例如,一种解决方案是您使用 AWS EventBridge 定期触发 Lambda(比如每 30 分钟一次)。此 Lambda 调用 /auth 端点,获取新令牌,然后将其存储在 DynamoDB 中。

然后,具有业务逻辑的 Lambda 应使用 consistent/strong 读取以避免竞争条件(写入一个新令牌,而另一个 Lambda 读取现在已过时的令牌)。

消息中的业务逻辑 Lambda 现在只需从数据库中读取令牌,允许您“扩展”到任意数量的 Lambda,而不会相互妨碍。

我想对@Jens 的回答提出一个稍微不同的看法:

  1. Use DynamoDB to store the current auth token: A DDB table with a single entry can be used as a authoritative source for the auth令牌。有关如何更新令牌的更多信息,请参见下面的 #3。

  2. 授权令牌的本地缓存:虽然 Lambda 应该是无状态的,但它确实有能力做到 some local caching,所以每个 Lambda 实例都应遵循如下逻辑:

    • 如果本地缓存中的 auth 令牌具有 NULL 值,则从 DDB 读取它并更新本地缓存
    • 如果授权令牌具有非 NULL 值,则尝试使用授权令牌发出请求
    • 如果请求因令牌无效而失败,请尝试刷新令牌(请参阅下面的#3)
  3. 利用条件写入来刷新令牌:为此,任何发现错误令牌的 lambda 实例都应尝试“指定自身”以刷新令牌通过在 DDB table 中的令牌记录上执行 conditional write 令牌,试图声称自己是自我指定的更新者

    • 如果条件写入成功,lambda 继续使 POST /auth 请求,获取新令牌并随后更新 DDB 记录并从锁中删除自身;
    • 如果条件写入失败,lambda 应该后退一段时间(可能是 200 毫秒左右),然后尝试重新读取记录以查看另一个 lambda 是否同时成功刷新了令牌
    • 如果在读取记录时 lambda 确定令牌已被刷新(即令牌值与其原来的值不同),它可以继续使用新令牌并且一切正常;如果它发现记录仍然被锁定,它可以再等一会儿再试
    • 你可能应该想出一个安全的后退时间段(比如 1-2 秒),在此之后 lambda 应该“接管”更新令牌的责任,假设另一个实例是先前已锁定它已因任何原因被终止;这是你不会陷入所有实例永远等待的死锁

此解决方案的优点是减少了为每个请求从 DDB 读取的需要,并且不需要任何其他服务(除了 Lambda 和 DDB)。

执行条件写入时,lambda 可以为操作生成 UUID 并将此 UUID 和项目上的时间戳存储起来,以指示项目被“锁定”的时间。后续实例将读取 UUID 和时间戳,并确定某个其他实例必须正在处理该项目。如果自项目被锁定以来已经过去了太多时间,另一个实例可以“接管”。

我最终使用分布式锁算法实现了这一点,依赖于 DynamoDB 的条件写入,如 and . I'll provide some of the details below (implemented in Python's boto3 API 中大致概述的那样,省略了一些特定于实现的细节):

  1. 从 DynamoDB 读取令牌并将其存储在本地。

  2. 如果使用令牌发出的请求被拒绝,请在内部将令牌标记为“无效”。

  3. 如果不存在令牌或已知令牌无效,请按照以下步骤操作:

    if old_token:
        condition = Attr('AccessToken').eq(old_token)
    else:
        condition = Attr('AccessToken').not_exists()
    
    release_code = uuid4().hex
    now = int(time())
    lock = {'release_code': release_code, 'expires': now + 15}
    condition &= Attr('AccessTokenUpdateLock').not_exists() | Attr('AccessTokenUpdateLock.expires').lte(now)
    
    table.update_item(
        Key=key,
        UpdateExpression='SET AccessTokenUpdateLock = :l',
        ExpressionAttributeValues={':l': lock},
        ConditionExpression=condition,
        ReturnValues='ALL_NEW'
    )
    

    简而言之,这会在 table 中的特定机器人记录中添加一个 AccessTokenUpdateLock 属性, 如果 没有这样的锁存在或已经过期。这样的锁基本上是一个随机字符串。如果成功,则可以安全地从 API 请求新令牌。否则这将失败并显示 DynamoDB.Client.exceptions.ConditionalCheckFailedException.

    在后一种情况下,客户端进入指数级回退读取循环:

    @backoff.on_predicate(backoff.expo, jitter=backoff.full_jitter, max_value=10, max_time=60)
    def await_updated_access_token():
        item = table.get_item(Key=key, ConsistentRead=True)
    
        if item['Item'].get('AccessToken') != old_token:
            # store new token
            return # new token
    
        try:
            if item['Item']['AccessTokenUpdateLock']['expires'] <= time():
                raise AccessTokenUpdateHasFailed
        except KeyError:
            return None
    

    backoff library 将不断重复此操作,直到函数 returns 标记或引发异常。如果引发 AccessTokenUpdateHasFailed,它会再次启动并尝试自己获取锁。

    如果客户端 did 成功获取锁,它会获取一个新的 API 令牌,并将其写入 DynamoDB:

    if old_token:
        condition = Attr('AccessToken').eq(old_token)
    else:
        condition = Attr('AccessToken').not_exists()
    
    condition &= Attr('AccessTokenUpdateLock.release_code').eq(release_code)
    
    response = table.update_item(
        Key=key,
        UpdateExpression='SET AccessToken = :t REMOVE AccessTokenUpdateLock',
        ExpressionAttributeValues={':t': new_token},
        ConditionExpression=condition,
        ReturnValues='ALL_NEW'
    )
    

这似乎相当稳健,对 API 本身的重试次数最少。