如何在 C# 中提高数据流插入 Bigquery table 的性能

How to improve the performance of data stream insert into Bigquery table in C#

我正在使用 C# .Net 客户端库将插入数据从本地文件流式传输到 Bigquery database.I 使用我的代码成功插入所有数据,但是,性能(插入速度)非常慢。我的加载速度为 20KB/秒。例如,我有一个大小为 323MB 的文件,程序需要大约 20 分钟才能 load.I 测试我的带宽,上传速度大约 3MB/秒 我想知道是否有提高性能的方法?(我想它应该比那快得多)

这是我的代码:

namespace BigQuery_Test
{
    class StreamDatacs
    {


        private static ServiceAccountCredential credential;

        public StreamDatacs()
        {
            Credential2().Wait();


        }

        public async Task Credential2()
        {
            string certificateFile = "C:\xxxxxx.p12";
            string serviceAccountEmail = "xxxxxxxxxxxx@developer.gserviceaccount.com";
            var certificate = new X509Certificate2(certificateFile, "notasecret", X509KeyStorageFlags.Exportable);
            credential = new ServiceAccountCredential(
               new ServiceAccountCredential.Initializer(serviceAccountEmail)
               {
                   Scopes = new[] { StorageService.Scope.DevstorageReadWrite,
                   BigqueryService.Scope.Bigquery }
               }.FromCertificate(certificate));

            Console.WriteLine(credential.Token);
            Console.WriteLine(credential.TokenServerUrl);
        }
        public void ReadData(string path)        
        {

            var logs = new List<TableDataInsertAllRequest.RowsData>();
            using (StreamReader sr = new StreamReader(path))
            {
                string line = "";
                string [] aLog = new string[1000];

                Log Loga = new Log();
                int count = 0;
                int i = 0;
                while((line = sr.ReadLine()) != null)
                {
                    var theLog = new TableDataInsertAllRequest.RowsData();              
                    string[] token = line.Split('\t');
                    theLog.Json = new Dictionary<string, object>();
                    theLog.Json.Add("Timestamp", token[0]);
                    theLog.Json.Add("ClientIpAddress", token[1]);
                    theLog.Json.Add("Username", token[2]);
                    theLog.Json.Add("GroupID", token[3]);
                    theLog.Json.Add("CompanyID", token[4]);
                    theLog.Json.Add("FullOrSiteLogging", token[5]);
                    theLog.Json.Add("PolicyFlags", token[6]);
                    theLog.Json.Add("ActionsTaken", token[7]);
                    theLog.Json.Add("ResponseStatus", token[8]);                       
                    logs.Add(theLog);
                    count++;

                    if (count > 20000)
                    {

                        BQStreamInsert(logs);
                        logs.Clear();
                        count = 0;

                    }
                }

            }

            Task.WaitAll(tasks.ToArray());
        }
        public void BQStreamInsert(List<TableDataInsertAllRequest.RowsData> rows)
        {
            string projectId= "ws-2015-logs";
            string datasetId = "CompanyGroup1";
            string tableId ="RawLogsTest7";
            var service = new BigqueryService(new BaseClientService.Initializer()
            {
                HttpClientInitializer = credential,

                ApplicationName = "BQ test"
            });
            try
            {
                var content = new TableDataInsertAllRequest();
                content.Rows = rows;
                content.Kind = "bigquery#tableDataInsertAllRequest";
                content.IgnoreUnknownValues = true;
                content.SkipInvalidRows = true;

                var insertTask = service.Tabledata.InsertAll(content, projectId, datasetId, tableId);
                TableDataInsertAllResponse response = insertTask.Execute();

            }
            catch (Exception ex)
            {

                Console.WriteLine(ex.Message);
            }


        }
    }
}

在program.cs中:

 class Program
    {
        static void Main(string[] args)
        {

            var sw = Stopwatch.StartNew();
            StreamDatacs sd = new StreamDatacs();
            sd.ReadData(@"C:15071600.csv");   
            Console.WriteLine(sw.ElapsedMilliseconds);
            sw.Stop();
            Console.Read();

        }

在代码中,我逐行读取文件,每 20,000 行停止并插入 BQ,直到所有行都已 inserted.I 尝试了不同的缓冲区大小(500 行、1000 行、50,000线)在这里,但没有看到很大的区别。 感谢您的任何建议。非常感谢。

由于流式传输的有效负载大小有限,请参阅 Quota policy 谈论时间更容易,因为有效负载对我们双方的限制方式相同,但我也会提到其他副作用。

您需要正确实施限制:

  • 最大行大小:1 MB
  • HTTP 请求大小限制:10 MB
  • 每个请求的最大行数:500

否则会报错。因此它被称为流式插入,因此您可以 运行 很多并行进程而不是一项大工作。

我们测量每个流媒体请求的时间在 1200-2500 毫秒之间,这在上个月是一致的,如您在图表中所见。

虽然我们看到了一些副作用:

  • 请求随机失败,类型为 'Backend error'
  • 请求随机失败,类型为 'Connection error'
  • 请求随机失败,类型为 'timeout'(注意这里,因为只有一些行失败,而不是整个负载)
  • 其他一些错误消息是非描述性的,它们非常模糊,对您没有帮助,请重试。
  • 我们每天都会看到数百起此类故障,因此它们几乎是常态,与云运行状况无关。

对于所有这些,我们在付费 Google 企业支持中打开了案例,但不幸的是他们没有解决它。推荐的选项是带重试的指数退避,即使支持人员被告知这样做。就我个人而言,这并不能让我开心。


如果您选择的方法需要数小时,这意味着 it does not scale,并且无法扩展。您需要重新考虑使用 async processes 的方法。为了尽快完成,你需要 运行 并行多个 worker,streaming 性能将是相同的。只要有 10 个 worker 并行,就意味着时间会减少 10 倍。

在后台 IO 绑定或 cpu 绑定任务中处理现在是大多数 Web 应用程序中的常见做法。有很多软件可以帮助构建后台作业,其中一些基于 Beanstalkd.

等消息系统

基本上,您需要在封闭的网络中分配插入作业,确定它们的优先级,然后使用 (运行) 它们。嗯,这正是 Beanstalkd 提供的。

Beanstalkd 提供了在管中组织作业的可能性,每个管对应一种作业类型。

您需要一个 API/producer 可以将作业放在管子上,假设是行的 json 表示。这是我们用例的杀手级功能。所以我们有一个 API 获取行,并将它们放在管子上,这只需要几毫秒,因此您可以实现快速响应时间。

另一方面,你现在在一些管子上有很多工作。你需要一个代理人。一个agent/consumer可以预约工作

它还可以帮助您进行作业管理和重试:成功处理作业后,消费者可以从管中删除该作业。在失败的情况下,消费者可以埋头工作。此作业不会被推回管中,但可以供进一步检查。

一个消费者可以释放一个作业,Beanstalkd 会把这个作业推回管中,让另一个客户端可以使用它。

可以在大多数常用语言中找到 Beanstalkd 客户端,web interface 可用于调试。