Azure Table 存储无法插入所有记录的 ExecuteAsync()

ExecuteAsync() of Azure Table Storage failing to insert all the records

我正在尝试将 10000 条记录插入到 Azure table 存储中。我正在使用 ExecuteAsync() 来实现它,但以某种方式插入了大约 7500 条记录,其余记录丢失了。我故意不使用 await 关键字,因为我不想等待结果,只想将它们存储在 table 中。下面是我的代码片段。

private static async void ConfigureAzureStorageTable()
    {
        CloudStorageAccount storageAccount =
            CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
        CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
        TableResult result = new TableResult();
        CloudTable table = tableClient.GetTableReference("test");
        table.CreateIfNotExists();

        for (int i = 0; i < 10000; i++)
        {
            var verifyVariableEntityObject = new VerifyVariableEntity()
            {
                ConsumerId = String.Format("{0}", i),
                Score = String.Format("{0}", i * 2 + 2),
                PartitionKey = String.Format("{0}", i),
                RowKey = String.Format("{0}", i * 2 + 2)
            };
            TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject);
            try
            {
                table.ExecuteAsync(insertOperation);
            }
            catch (Exception e)
            {

                Console.WriteLine(e.Message);
            }
        }
    }

使用方法有什么不对的地方吗?

仍然想要await table.ExecuteAsync()。这将意味着 ConfigureAzureStorageTable() returns 控制权在此时交给调用者,调用者可以继续执行。

问题中的方式,ConfigureAzureStorageTable() 将继续调用 table.ExecuteAsync() 并退出,而 table 之类的内容将超出范围,而table.ExecuteAsync() 任务仍未完成。

关于在 SO 和其他地方使用 async void 有很多注意事项,您还需要考虑这些注意事项。您可以像 async Task 一样轻松地拥有您的方法,但不在调用者 中等待它 ,但保留返回的 Task 以进行干净终止等。

编辑:一个补充——你几乎肯定想在你的await上使用ConfigureAwait(false),因为你似乎不需要保留任何上下文。这个 blog post 有一些关于异步的指南。

一次使用 TableBatchOperation 到 运行 批 N 插入怎么样?

private const int BatchSize = 100;

private static async void ConfigureAzureStorageTable()
{
    CloudStorageAccount storageAccount =
        CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
    CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
    TableResult result = new TableResult();
    CloudTable table = tableClient.GetTableReference("test");
    table.CreateIfNotExists();

    var batchOperation = new TableBatchOperation();

    for (int i = 0; i < 10000; i++)
    {
        var verifyVariableEntityObject = new VerifyVariableEntity()
        {
            ConsumerId = String.Format("{0}", i),
            Score = String.Format("{0}", i * 2 + 2),
            PartitionKey = String.Format("{0}", i),
            RowKey = String.Format("{0}", i * 2 + 2)
        };
        TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject);
        batchOperation.Add(insertOperation);

        if (batchOperation.Count >= BatchSize)
        {
            try
            {
                await table.ExecuteBatchAsync(batchOperation);
                batchOperation = new TableBatchOperation();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }

    if(batchOperation.Count > 0)
    {
        try
        {
            await table.ExecuteBatchAsync(batchOperation);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
        }
    }
}

您可以根据需要调整 BatchSize。小免责声明:我没有尝试 运行 这个,虽然它应该有效。

但是我不禁想知道为什么你的功能是async void?这应该保留给事件处理程序和您无法决定接口的类似事件处理程序。在大多数情况下,您想要 return 一项任务。因为现在调用者无法捕获此函数中发生的异常。

async void 不是一个好的做法,除非它是一个事件处理程序。

https://msdn.microsoft.com/en-us/magazine/jj991977.aspx

如果您计划将许多记录插入到 Azure table 存储中,批量插入是您的最佳选择。

https://msdn.microsoft.com/en-us/library/azure/microsoft.windowsazure.storage.table.tablebatchoperation.aspx

请记住,它限制每批处理 100 table 次操作。

根据您的要求,我已经通过使用 CloudTable.ExecuteAsyncCloudTable.ExecuteBatchAsync 成功地在我这边测试了您的方案。这是我关于使用 CloudTable.ExecuteBatchAsync 将记录插入 Azure Table 存储的代码片段,您可以参考它。

Program.cs 主要

class Program
{
    static void Main(string[] args)
    {
        CloudStorageAccount storageAccount =
            CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
        CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
        TableResult result = new TableResult();
        CloudTable table = tableClient.GetTableReference("test");
        table.CreateIfNotExists();

        //Generate records to be inserted into Azure Table Storage
        var entities = Enumerable.Range(1, 10000).Select(i => new VerifyVariableEntity()
        {
            ConsumerId = String.Format("{0}", i),
            Score = String.Format("{0}", i * 2 + 2),
            PartitionKey = String.Format("{0}", i),
            RowKey = String.Format("{0}", i * 2 + 2)
        });

        //Group records by PartitionKey and prepare for executing batch operations
        var batches = TableBatchHelper<VerifyVariableEntity>.GetBatches(entities);

        //Execute batch operations in parallel
        Parallel.ForEach(batches, new ParallelOptions()
        {
            MaxDegreeOfParallelism = 5
        }, (batchOperation) =>
        {
            try
            {
                table.ExecuteBatch(batchOperation);
                Console.WriteLine("Writing {0} records", batchOperation.Count);
            }
            catch (Exception ex)
            {
                Console.WriteLine("ExecuteBatch throw a exception:" + ex.Message);
            }
        });
        Console.WriteLine("Done!");
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

TableBatchHelper.cs

public class TableBatchHelper<T> where T : ITableEntity
{
    const int batchMaxSize = 100;

    public static IEnumerable<TableBatchOperation> GetBatches(IEnumerable<T> items)
    {
        var list = new List<TableBatchOperation>();
        var partitionGroups = items.GroupBy(arg => arg.PartitionKey).ToArray();
        foreach (var group in partitionGroups)
        {
            T[] groupList = group.ToArray();
            int offSet = batchMaxSize;
            T[] entities = groupList.Take(offSet).ToArray();
            while (entities.Any())
            {
                var tableBatchOperation = new TableBatchOperation();
                foreach (var entity in entities)
                {
                    tableBatchOperation.Add(TableOperation.InsertOrReplace(entity));
                }
                list.Add(tableBatchOperation);
                entities = groupList.Skip(offSet).Take(batchMaxSize).ToArray();
                offSet += batchMaxSize;
            }
        }
        return list;
    }
}

注意:官方document关于插入一批实体有提到:

A single batch operation can include up to 100 entities.

All entities in a single batch operation must have the same partition key.

综上所述,请尝试检查它是否适合您。此外,您可以在控制台应用程序中捕获详细异常并通过 Fiddler 捕获 HTTP 请求,以在将记录插入 Azure Table 存储时捕获 HTTP 错误请求。

我遇到了同样的问题并通过 强制 ExecuteAsync 在结果存在之前等待结果..

table.ExecuteAsync(insertOperation).GetAwaiter().GetResult()