异步调用 AWS RDS CreateDBSnapshotAsync "Set It And Forget It"

Calling AWS RDS CreateDBSnapshotAsync Asynchronously "Set It And Forget It"

在 AWS Lambda 函数中,我希望能够调用一个组件来创建 RDS 数据库快照。客户端上有一个名为 CreateDBSnapshotAsync 的异步方法。但是,因为这是 AWS Lambda,我只有 5 分钟的时间来完成任务。因此,如果我等待它,AWS Lambda 函数将超时。而且,显然当它超时时,调用被取消,然后快照没有完成。

有没有什么方法可以让我以完全异步的方式进行调用,这样一旦我调用它,无论我的 Lambda 函数是否超时,它都会完成? 换句话说,我不在乎结果,我只想调用过程并继续前进,一种"set it and forget it"的心态。

我的调用(显然没有等待)如下

        using (var rdsClient = new AmazonRDSClient())
        {
            Task<CreateDBSnapshotResponse> response = rdsClient.CreateDBSnapshotAsync(new CreateDBSnapshotRequest($"MySnapShot", instanceId));
        }

根据要求,这是完整的方法:

    public async Task<CloudFormationResponse> MigrateDatabase(CloudFormationRequest request, ILambdaContext context)
    {
        LambdaLogger.Log($"{nameof(MigrateDatabase)} invoked: " + JsonConvert.SerializeObject(request));


        if (request.RequestType != "Delete")
        {
            try
            {
                var migrations = this.Context.Database.GetPendingMigrations().OrderBy(b=>b).ToList();
                for (int i = 0; i < migrations.Count(); i++)
                {
                    string thisMigration = migrations [i];
                    this.ApplyMigrationInternal(thisMigration);
                }
                this.TakeSnapshotAsync(context,migrations.Last());
                return await CloudFormationResponse.CompleteCloudFormationResponse(null, request, context);

            }
            catch (Exception e)
            {
                LambdaLogger.Log(e.ToString());
                if (e.InnerException != null) LambdaLogger.Log(e.InnerException.ToString());
                return await CloudFormationResponse.CompleteCloudFormationResponse(e, request, context);
            }
        }
        return await CloudFormationResponse.CompleteCloudFormationResponse(null, request, context);

    }

    internal void TakeSnapshotAsync(ILambdaContext context, string migration)
    {

        var instanceId = this.GetEnvironmentVariable(nameof(DBInstance));

        using (var rdsClient = new AmazonRDSClient())
        {


            Task<CreateDBSnapshotResponse> response = rdsClient.CreateDBSnapshotAsync(new CreateDBSnapshotRequest($"{instanceId}{migration.Replace('_','-')}", instanceId));
            while (context.RemainingTime > TimeSpan.FromSeconds(15))
            {
                Thread.Sleep(15000);
            }
        }
    }

首先重构该子函数以使用正确的异步语法以及 Task.WhenAny.

internal async Task TakeSnapshotAsync(ILambdaContext context, string migration) {
    var instanceId = this.GetEnvironmentVariable(nameof(DBInstance));
    //don't wrap in using block or it will be disposed before you are done with it.
    var rdsClient = new AmazonRDSClient();
    var request = new CreateDBSnapshotRequest($"{instanceId}{migration.Replace('_','-')}", instanceId);
    //don't await this long running task
    Task<CreateDBSnapshotResponse> response = rdsClient.CreateDBSnapshotAsync(request);
    Task delay = Task.Run(async () => {
        while (context.RemainingTime > TimeSpan.FromSeconds(15)) {
            await Task.Delay(15000); //Don't mix Thread.Sleep. use Task.Delay and await it.
        }
    }
    // The call returns as soon as the first operation completes, 
    // even if the others are still running.
    await Task.WhenAny(response, delay);
}

所以如果RemainingTime用完了,即使快照任务还在运行也会中断调用,这样请求就不会超时。

现在您应该可以在上下文中仍有可用时间时等待快照

public async Task<CloudFormationResponse> MigrateDatabase(CloudFormationRequest request, ILambdaContext context) {
    LambdaLogger.Log($"{nameof(MigrateDatabase)} invoked: " + JsonConvert.SerializeObject(request));

    if (request.RequestType != "Delete") {
        try {
            var migrations = this.Context.Database.GetPendingMigrations().OrderBy(b=>b).ToList();
            for (int i = 0; i < migrations.Count(); i++) {
                string thisMigration = migrations [i];
                this.ApplyMigrationInternal(thisMigration);
            }
            await this.TakeSnapshotAsync(context, migrations.Last());
            return await CloudFormationResponse.CompleteCloudFormationResponse(null, request, context);
        } catch (Exception e) {
            LambdaLogger.Log(e.ToString());
            if (e.InnerException != null) LambdaLogger.Log(e.InnerException.ToString());
            return await CloudFormationResponse.CompleteCloudFormationResponse(e, request, context);
        }
    }
    return await CloudFormationResponse.CompleteCloudFormationResponse(null, request, context);

}

这还应该允许 RDS 客户端抛出的任何异常被当前正在执行的线程捕获。这应该有助于对任何异常消息进行故障排除。

来自 documentation 的一些有趣信息。

Using Async in C# Functions with AWS Lambda

If you know your Lambda function will require a long-running process, such as uploading large files to Amazon S3 or reading a large stream of records from DynamoDB, you can take advantage of the async/await pattern. When you use this signature, Lambda executes the function synchronously and waits for the function to return a response or for execution to time out.

来自docs about timeouts

Function Settings

...

  • Timeout – The amount of time that Lambda allows a function to run before stopping it. The default is 3 seconds. The maximum allowed value is 900 seconds.

如果出现 HTTP 超时,则缩短延迟但留下长 运行 任务。您仍然使用 Task.WhenAny 为长 运行 任务提供先完成的机会,即使这不是预期。

internal async Task TakeSnapshotAsync(ILambdaContext context, string migration) {
    var instanceId = this.GetEnvironmentVariable(nameof(DBInstance));
    //don't wrap in using block or it will be disposed before you are done with it.
    var rdsClient = new AmazonRDSClient();
    var request = new CreateDBSnapshotRequest($"{instanceId}{migration.Replace('_','-')}", instanceId);
    //don't await this long running task
    Task<CreateDBSnapshotResponse> response = rdsClient.CreateDBSnapshotAsync(request);
    Task delay = Task.Delay(TimeSpan.FromSeconds(2.5));
    // The call returns as soon as the first operation completes, 
    // even if the others are still running.
    await Task.WhenAny(response, delay);
}