使用条件表达式对 DynamoDB 的并发更新有时同时通过

Concurrent updates on DynamoDB with Conditional Expression sometime both passing

我遇到一个问题,两个并发进程在 5 毫秒内更新 DynamoDB table,并且当我希望其中一个进程抛出 ConditionalCheckFailedException 异常时,它们都通过了条件表达式。文档状态:

DynamoDB supports mechanisms, like conditional writes, that are necessary for distributed locks.

https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/

我的 table 架构有一个名为“Id”的键属性:

AttributeDefinitions:
    -
      AttributeName: "Id"
      AttributeType: "S"
  KeySchema:
    -
      AttributeName: "Id"
      KeyType: "HASH"

我的条件表达式是:

string conditional = "attribute_not_exists(StartedRefreshingAt)";

包括条件表达式的锁定方法:

private bool Lock(Configuration config)
{
    string conditional = "attribute_not_exists(StartedRefreshingAt)";       
    Dictionary<string, AttributeValue> values = new Dictionary<string, AttributeValue>{
        {":new_refresh", new AttributeValue(ToDDBDate(DateTime.Now))}};         

    try
    {           
        _dynamoDB.UpdateItemAsync(new UpdateItemRequest
        {
            TableName = TABLE_NAME,
            Key = new Dictionary<string, AttributeValue>{{"Id", new AttributeValue(config.Id)}},
            UpdateExpression = "set StartedRefreshingAt = :new_refresh",
            ConditionExpression = conditional,
            ExpressionAttributeValues = values

        }).Wait();
        return true;
    }
    catch (Exception)
    {
        return false;
    }
}

如果 returns 为真,我认为 table 已锁定,因为 StartedRefreshingAt 属性现在存在。

完成对其他属性记录的一些其他更新后,我再次删除了 StartedRefreshingAt 属性,有效地释放了锁。这是使用lock方法的方法:

private async Task<Configuration> RefreshAsync(Configuration config)
{
    // concurrent executions may enter here with the same value
    // for config
    if (config.AccessTokenExpired()) // Validates the age of the config.AccessToken
    {
        _logger.LogInformation($"Refreshing expired access token for {config.Id}");

        if (Lock(config))
        {
            // The below code should not be executed concurrently
            
            var authPayload = new List<KeyValuePair<string, string>>();
            authPayload.Add(new KeyValuePair<string, string>("grant_type", "refresh_token"));
            authPayload.Add(new KeyValuePair<string, string>("refresh_token", config.RefreshToken));
            
            // 3rd party REST API call
            // IF execution 1 completed and saved first, the below API
            // call should fail with an "invlaid grant" response
            // since it will be using a stale refresh token.
            JObject authResponse = await GetAuthTokensAsync(authPayload); 

            // Bad practice, but print the auth tokens to logs here 
            // in case we need to recover a RefreshToken.
            _logger.LogInformation($"Got auth token response for {config.Id}:" +
                $" {authResponse.ToString(Formatting.None)}");
            
            config.AccessToken = authResponse["access_token"].ToString();
            config.AccessTokenCreated = DateTime.Now;
            // RefreshToken is updated whenever the API call succeeds.
            config.RefreshToken = authResponse["refresh_token"].ToString();
            config.RefreshTokenCreated = config.AccessTokenCreated;
            config.StartedRefreshingAt = null; // lock attribute
            
            await Save(config); // saves to DynamoDB, releasing the lock.
        }
        else
        {
            _logger.LogInformation($"Someone else is refreshing {config.Id} at same time, so ignoring and returning current value");
            // this will return a potentially stale config object, which client code is expected to handle
        }
    }

    return config;
}

大多数时候,当两个并发执行 运行 时,此代码在两次执行之一上成功 returns false,平均每天大约 20 次。这似乎表明表达式正在正确评估。但大约每 2 周一次,并发执行返回 true。

编辑:在查看答案并提供更多代码上下文后,问题很可能不是 DynamoDB,执行 2 的条件写入在执行 1 释放锁后成功.这可能是与第 3 方 OAuth 服务器的一致性问题。

现在我认为问题在于执行 2 使用与执行 1 相同的刷新令牌成功完成了 OAuth API 调用。这应该是不可能的。

最终结果是我在 DynamoDB 中保存了一个 RefreshToken,它永远 returns“无效授予”,这意味着它已过时。如果我查看日志,了解日志行“Got auth token response for”何时由两个非常接近的执行写入,我可以使用首先记录的 RefreshToken 手动更新 DynamoDB table,它会恢复。

这个场景对于竞争条件来说非常经典:

  1. 执行 1 设置 StartedRefreshingAt 属性并且 returns 为真,继续完成其他工作
  2. 执行 2 设置 StartedRefreshingAt 属性和 returns true,继续完成其他工作
  3. 执行 2 将额外工作的结果写回到 table
  4. 执行 1 将额外工作的结果写回 table,覆盖执行 2 最近完成的工作。

也有可能有时步骤 3 和步骤 4 是相反执行的,但这对我来说不存在一致性问题,因为最近完成的工作是所需的最终结果。

您所建议的竞争非常令人惊讶,因为这正是 DynamoDB 声称其条件更新所避免的。因此,要么亚马逊在其实施中存在严重错误(这令人惊讶,但并非不可能),要么比赛实际上与您在问题中描述的不同。

在您的时间表中,您没有说明您的代码如何将“StartedRefreshingAt”重置为空。将工作结果写回 table 的相同 UpdateTable 操作是否也会删除 StartedRefreshingAt 属性?因为如果是单独的写入,理论上可以(即使不常见)将两次写入重新排序。如果 StartedRefreshingAt 首先被删除,那一刻第二个进程可以开始自己的工作 - 在第一个进程的结果被写入之前 - 所以你描述的问题可能会发生。

您没有说明的另一件事是您的处理过程如何读取 项目中的工作。如果您不小心使用最终一致性而不是强一致性进行读取,则执行 2 实际上可能在执行 1 完成后开始,但是当它读取它需要做的工作时 - 它再次读取旧值而不是什么执行 1 写道 - 所以执行 2 最终重复了 1 的工作而不是做新的工作。

我不知道这些猜测是否有道理,因为我不知道您的应用程序的详细信息,但我认为 DynamoDB 一致性根本无法按承诺工作的可能性是我最后的猜测制作.