在 AWS Lambda 实例之间协调单例令牌
Coordinating singleton token between AWS Lambda instances
考虑一个编写为 AWS Lambda 函数的聊天机器人。它通过来自第 3 方服务的 HTTP 请求被调用,它集成到聊天服务中。现在,第 3 方 API 有点……古怪。它的主要问题是:它需要一个身份验证令牌才能与之交互,但任何时候只能存在一个令牌。它的工作方式是:
- Bot 使用一堆秘密信息创建对
POST /auth
的请求,并返回一个令牌作为响应。
- Bot 然后向
POST /messages
发出请求,并向聊天室发送 Authorization: Bearer <token>
至 post 条消息。
- 令牌在几个小时后过期。
关键在于 POST /auth
请求会重置任何先前的令牌并使持有令牌的所有其他客户端无效。没有 GET /auth
或类似的方法来获取现有令牌。
这现在变成了随机生成的机器人并发实例之间的协调问题。他们都需要使用相同的令牌,但会独立生成令牌。我不太热衷于引入一些单例服务,其任务只是协调令牌,我想保留独立扩展的 Lambda 范例。我正在使用 DynamoDB 存储令牌以尝试在机器人实例之间协调它们,但仍然存在边缘情况竞争条件,最多需要重试三次才能使令牌结算。
最坏的情况是存储在 DynamoDB 中的令牌无效,同时实例化两个机器人:
- 两者都将读取无效令牌
- 两者都会尝试失败的请求
- 两者都将尝试再次读取令牌,因为另一个机器人可能同时刷新了它
- 两者都会丢弃已知是坏的令牌
- 两者都会生成一个新的令牌并存储它,其中一个随机“获胜”
- “失败者”将执行另一个错误请求并重复前面的步骤
理想情况下会有一个可锁定的资源,一个机器人可以锁定并生成令牌,而其他人可以等待。但是 AFAIK DynamoDB 没有那种功能。
什么是好的模式and/or AWS 服务来协调独立的并行 Lambda 实例之间的此类单例令牌?
虽然 DynamoDB 中没有实现本机锁定,但这是 2017 年 AWS 博客 post 展示了 Java SDK 的 DynamoDB 锁定库。
https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
这段摘录揭示了该机制:
The DynamoDB Lock Client uses the DynamoDB UpdateItem API to heartbeat
and extend locks each host owns. Additionally, the lock client uses
client-side TTL to expire locks. The lock client uses a recent version
of the AWS SDK for Java. The locking protocol employs conditional
updates extensively and throughout the lifecycle of a lock (creation,
renewal by heartbeat, and expiration by deletion).
有一些针对其他语言的分叉和改编,例如 Python。
就我个人而言,我会尝试解耦令牌的读写操作。应该只有一个写入令牌的“进程”和多个从存储中读取令牌的“进程”。
例如,一种解决方案是您使用 AWS EventBridge 定期触发 Lambda(比如每 30 分钟一次)。此 Lambda 调用 /auth
端点,获取新令牌,然后将其存储在 DynamoDB 中。
然后,具有业务逻辑的 Lambda 应使用 consistent/strong 读取以避免竞争条件(写入一个新令牌,而另一个 Lambda 读取现在已过时的令牌)。
消息中的业务逻辑 Lambda 现在只需从数据库中读取令牌,允许您“扩展”到任意数量的 Lambda,而不会相互妨碍。
我想对@Jens 的回答提出一个稍微不同的看法:
Use DynamoDB to store the current auth token: A DDB table with a single entry can be used as a authoritative source for the auth令牌。有关如何更新令牌的更多信息,请参见下面的 #3。
授权令牌的本地缓存:虽然 Lambda 应该是无状态的,但它确实有能力做到 some local caching,所以每个 Lambda 实例都应遵循如下逻辑:
- 如果本地缓存中的 auth 令牌具有 NULL 值,则从 DDB 读取它并更新本地缓存
- 如果授权令牌具有非 NULL 值,则尝试使用授权令牌发出请求
- 如果请求因令牌无效而失败,请尝试刷新令牌(请参阅下面的#3)
利用条件写入来刷新令牌:为此,任何发现错误令牌的 lambda 实例都应尝试“指定自身”以刷新令牌通过在 DDB table 中的令牌记录上执行 conditional write 令牌,试图声称自己是自我指定的更新者
- 如果条件写入成功,lambda 继续使
POST /auth
请求,获取新令牌并随后更新
DDB 记录并从锁中删除自身;
- 如果条件写入失败,lambda 应该后退一段时间(可能是 200 毫秒左右),然后尝试重新读取记录以查看另一个 lambda 是否同时成功刷新了令牌
- 如果在读取记录时 lambda 确定令牌已被刷新(即令牌值与其原来的值不同),它可以继续使用新令牌并且一切正常;如果它发现记录仍然被锁定,它可以再等一会儿再试
- 你可能应该想出一个安全的后退时间段(比如 1-2 秒),在此之后 lambda 应该“接管”更新令牌的责任,假设另一个实例是先前已锁定它已因任何原因被终止;这是你不会陷入所有实例永远等待的死锁
此解决方案的优点是减少了为每个请求从 DDB 读取的需要,并且不需要任何其他服务(除了 Lambda 和 DDB)。
执行条件写入时,lambda 可以为操作生成 UUID 并将此 UUID 和项目上的时间戳存储起来,以指示项目被“锁定”的时间。后续实例将读取 UUID 和时间戳,并确定某个其他实例必须正在处理该项目。如果自项目被锁定以来已经过去了太多时间,另一个实例可以“接管”。
我最终使用分布式锁算法实现了这一点,依赖于 DynamoDB 的条件写入,如 and . I'll provide some of the details below (implemented in Python's boto3 API 中大致概述的那样,省略了一些特定于实现的细节):
从 DynamoDB 读取令牌并将其存储在本地。
如果使用令牌发出的请求被拒绝,请在内部将令牌标记为“无效”。
如果不存在令牌或已知令牌无效,请按照以下步骤操作:
if old_token:
condition = Attr('AccessToken').eq(old_token)
else:
condition = Attr('AccessToken').not_exists()
release_code = uuid4().hex
now = int(time())
lock = {'release_code': release_code, 'expires': now + 15}
condition &= Attr('AccessTokenUpdateLock').not_exists() | Attr('AccessTokenUpdateLock.expires').lte(now)
table.update_item(
Key=key,
UpdateExpression='SET AccessTokenUpdateLock = :l',
ExpressionAttributeValues={':l': lock},
ConditionExpression=condition,
ReturnValues='ALL_NEW'
)
简而言之,这会在 table 中的特定机器人记录中添加一个 AccessTokenUpdateLock
属性, 如果 没有这样的锁存在或已经过期。这样的锁基本上是一个随机字符串。如果成功,则可以安全地从 API 请求新令牌。否则这将失败并显示 DynamoDB.Client.exceptions.ConditionalCheckFailedException
.
在后一种情况下,客户端进入指数级回退读取循环:
@backoff.on_predicate(backoff.expo, jitter=backoff.full_jitter, max_value=10, max_time=60)
def await_updated_access_token():
item = table.get_item(Key=key, ConsistentRead=True)
if item['Item'].get('AccessToken') != old_token:
# store new token
return # new token
try:
if item['Item']['AccessTokenUpdateLock']['expires'] <= time():
raise AccessTokenUpdateHasFailed
except KeyError:
return None
backoff
library 将不断重复此操作,直到函数 returns 标记或引发异常。如果引发 AccessTokenUpdateHasFailed
,它会再次启动并尝试自己获取锁。
如果客户端 did 成功获取锁,它会获取一个新的 API 令牌,并将其写入 DynamoDB:
if old_token:
condition = Attr('AccessToken').eq(old_token)
else:
condition = Attr('AccessToken').not_exists()
condition &= Attr('AccessTokenUpdateLock.release_code').eq(release_code)
response = table.update_item(
Key=key,
UpdateExpression='SET AccessToken = :t REMOVE AccessTokenUpdateLock',
ExpressionAttributeValues={':t': new_token},
ConditionExpression=condition,
ReturnValues='ALL_NEW'
)
这似乎相当稳健,对 API 本身的重试次数最少。
考虑一个编写为 AWS Lambda 函数的聊天机器人。它通过来自第 3 方服务的 HTTP 请求被调用,它集成到聊天服务中。现在,第 3 方 API 有点……古怪。它的主要问题是:它需要一个身份验证令牌才能与之交互,但任何时候只能存在一个令牌。它的工作方式是:
- Bot 使用一堆秘密信息创建对
POST /auth
的请求,并返回一个令牌作为响应。 - Bot 然后向
POST /messages
发出请求,并向聊天室发送Authorization: Bearer <token>
至 post 条消息。 - 令牌在几个小时后过期。
关键在于 POST /auth
请求会重置任何先前的令牌并使持有令牌的所有其他客户端无效。没有 GET /auth
或类似的方法来获取现有令牌。
这现在变成了随机生成的机器人并发实例之间的协调问题。他们都需要使用相同的令牌,但会独立生成令牌。我不太热衷于引入一些单例服务,其任务只是协调令牌,我想保留独立扩展的 Lambda 范例。我正在使用 DynamoDB 存储令牌以尝试在机器人实例之间协调它们,但仍然存在边缘情况竞争条件,最多需要重试三次才能使令牌结算。
最坏的情况是存储在 DynamoDB 中的令牌无效,同时实例化两个机器人:
- 两者都将读取无效令牌
- 两者都会尝试失败的请求
- 两者都将尝试再次读取令牌,因为另一个机器人可能同时刷新了它
- 两者都会丢弃已知是坏的令牌
- 两者都会生成一个新的令牌并存储它,其中一个随机“获胜”
- “失败者”将执行另一个错误请求并重复前面的步骤
理想情况下会有一个可锁定的资源,一个机器人可以锁定并生成令牌,而其他人可以等待。但是 AFAIK DynamoDB 没有那种功能。
什么是好的模式and/or AWS 服务来协调独立的并行 Lambda 实例之间的此类单例令牌?
虽然 DynamoDB 中没有实现本机锁定,但这是 2017 年 AWS 博客 post 展示了 Java SDK 的 DynamoDB 锁定库。 https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
这段摘录揭示了该机制:
The DynamoDB Lock Client uses the DynamoDB UpdateItem API to heartbeat and extend locks each host owns. Additionally, the lock client uses client-side TTL to expire locks. The lock client uses a recent version of the AWS SDK for Java. The locking protocol employs conditional updates extensively and throughout the lifecycle of a lock (creation, renewal by heartbeat, and expiration by deletion).
有一些针对其他语言的分叉和改编,例如 Python。
就我个人而言,我会尝试解耦令牌的读写操作。应该只有一个写入令牌的“进程”和多个从存储中读取令牌的“进程”。
例如,一种解决方案是您使用 AWS EventBridge 定期触发 Lambda(比如每 30 分钟一次)。此 Lambda 调用 /auth
端点,获取新令牌,然后将其存储在 DynamoDB 中。
然后,具有业务逻辑的 Lambda 应使用 consistent/strong 读取以避免竞争条件(写入一个新令牌,而另一个 Lambda 读取现在已过时的令牌)。
消息中的业务逻辑 Lambda 现在只需从数据库中读取令牌,允许您“扩展”到任意数量的 Lambda,而不会相互妨碍。
我想对@Jens 的回答提出一个稍微不同的看法:
Use DynamoDB to store the current auth token: A DDB table with a single entry can be used as a authoritative source for the auth令牌。有关如何更新令牌的更多信息,请参见下面的 #3。
授权令牌的本地缓存:虽然 Lambda 应该是无状态的,但它确实有能力做到 some local caching,所以每个 Lambda 实例都应遵循如下逻辑:
- 如果本地缓存中的 auth 令牌具有 NULL 值,则从 DDB 读取它并更新本地缓存
- 如果授权令牌具有非 NULL 值,则尝试使用授权令牌发出请求
- 如果请求因令牌无效而失败,请尝试刷新令牌(请参阅下面的#3)
利用条件写入来刷新令牌:为此,任何发现错误令牌的 lambda 实例都应尝试“指定自身”以刷新令牌通过在 DDB table 中的令牌记录上执行 conditional write 令牌,试图声称自己是自我指定的更新者
- 如果条件写入成功,lambda 继续使
POST /auth
请求,获取新令牌并随后更新 DDB 记录并从锁中删除自身; - 如果条件写入失败,lambda 应该后退一段时间(可能是 200 毫秒左右),然后尝试重新读取记录以查看另一个 lambda 是否同时成功刷新了令牌
- 如果在读取记录时 lambda 确定令牌已被刷新(即令牌值与其原来的值不同),它可以继续使用新令牌并且一切正常;如果它发现记录仍然被锁定,它可以再等一会儿再试
- 你可能应该想出一个安全的后退时间段(比如 1-2 秒),在此之后 lambda 应该“接管”更新令牌的责任,假设另一个实例是先前已锁定它已因任何原因被终止;这是你不会陷入所有实例永远等待的死锁
- 如果条件写入成功,lambda 继续使
此解决方案的优点是减少了为每个请求从 DDB 读取的需要,并且不需要任何其他服务(除了 Lambda 和 DDB)。
执行条件写入时,lambda 可以为操作生成 UUID 并将此 UUID 和项目上的时间戳存储起来,以指示项目被“锁定”的时间。后续实例将读取 UUID 和时间戳,并确定某个其他实例必须正在处理该项目。如果自项目被锁定以来已经过去了太多时间,另一个实例可以“接管”。
我最终使用分布式锁算法实现了这一点,依赖于 DynamoDB 的条件写入,如
从 DynamoDB 读取令牌并将其存储在本地。
如果使用令牌发出的请求被拒绝,请在内部将令牌标记为“无效”。
如果不存在令牌或已知令牌无效,请按照以下步骤操作:
if old_token: condition = Attr('AccessToken').eq(old_token) else: condition = Attr('AccessToken').not_exists() release_code = uuid4().hex now = int(time()) lock = {'release_code': release_code, 'expires': now + 15} condition &= Attr('AccessTokenUpdateLock').not_exists() | Attr('AccessTokenUpdateLock.expires').lte(now) table.update_item( Key=key, UpdateExpression='SET AccessTokenUpdateLock = :l', ExpressionAttributeValues={':l': lock}, ConditionExpression=condition, ReturnValues='ALL_NEW' )
简而言之,这会在 table 中的特定机器人记录中添加一个
AccessTokenUpdateLock
属性, 如果 没有这样的锁存在或已经过期。这样的锁基本上是一个随机字符串。如果成功,则可以安全地从 API 请求新令牌。否则这将失败并显示DynamoDB.Client.exceptions.ConditionalCheckFailedException
.在后一种情况下,客户端进入指数级回退读取循环:
@backoff.on_predicate(backoff.expo, jitter=backoff.full_jitter, max_value=10, max_time=60) def await_updated_access_token(): item = table.get_item(Key=key, ConsistentRead=True) if item['Item'].get('AccessToken') != old_token: # store new token return # new token try: if item['Item']['AccessTokenUpdateLock']['expires'] <= time(): raise AccessTokenUpdateHasFailed except KeyError: return None
backoff
library 将不断重复此操作,直到函数 returns 标记或引发异常。如果引发AccessTokenUpdateHasFailed
,它会再次启动并尝试自己获取锁。如果客户端 did 成功获取锁,它会获取一个新的 API 令牌,并将其写入 DynamoDB:
if old_token: condition = Attr('AccessToken').eq(old_token) else: condition = Attr('AccessToken').not_exists() condition &= Attr('AccessTokenUpdateLock.release_code').eq(release_code) response = table.update_item( Key=key, UpdateExpression='SET AccessToken = :t REMOVE AccessTokenUpdateLock', ExpressionAttributeValues={':t': new_token}, ConditionExpression=condition, ReturnValues='ALL_NEW' )
这似乎相当稳健,对 API 本身的重试次数最少。