如何在 C# 中提高数据流插入 Bigquery table 的性能
How to improve the performance of data stream insert into Bigquery table in C#
我正在使用 C# .Net 客户端库将插入数据从本地文件流式传输到 Bigquery database.I 使用我的代码成功插入所有数据,但是,性能(插入速度)非常慢。我的加载速度为 20KB/秒。例如,我有一个大小为 323MB 的文件,程序需要大约 20 分钟才能 load.I 测试我的带宽,上传速度大约 3MB/秒 我想知道是否有提高性能的方法?(我想它应该比那快得多)
这是我的代码:
namespace BigQuery_Test
{
class StreamDatacs
{
private static ServiceAccountCredential credential;
public StreamDatacs()
{
Credential2().Wait();
}
public async Task Credential2()
{
string certificateFile = "C:\xxxxxx.p12";
string serviceAccountEmail = "xxxxxxxxxxxx@developer.gserviceaccount.com";
var certificate = new X509Certificate2(certificateFile, "notasecret", X509KeyStorageFlags.Exportable);
credential = new ServiceAccountCredential(
new ServiceAccountCredential.Initializer(serviceAccountEmail)
{
Scopes = new[] { StorageService.Scope.DevstorageReadWrite,
BigqueryService.Scope.Bigquery }
}.FromCertificate(certificate));
Console.WriteLine(credential.Token);
Console.WriteLine(credential.TokenServerUrl);
}
public void ReadData(string path)
{
var logs = new List<TableDataInsertAllRequest.RowsData>();
using (StreamReader sr = new StreamReader(path))
{
string line = "";
string [] aLog = new string[1000];
Log Loga = new Log();
int count = 0;
int i = 0;
while((line = sr.ReadLine()) != null)
{
var theLog = new TableDataInsertAllRequest.RowsData();
string[] token = line.Split('\t');
theLog.Json = new Dictionary<string, object>();
theLog.Json.Add("Timestamp", token[0]);
theLog.Json.Add("ClientIpAddress", token[1]);
theLog.Json.Add("Username", token[2]);
theLog.Json.Add("GroupID", token[3]);
theLog.Json.Add("CompanyID", token[4]);
theLog.Json.Add("FullOrSiteLogging", token[5]);
theLog.Json.Add("PolicyFlags", token[6]);
theLog.Json.Add("ActionsTaken", token[7]);
theLog.Json.Add("ResponseStatus", token[8]);
logs.Add(theLog);
count++;
if (count > 20000)
{
BQStreamInsert(logs);
logs.Clear();
count = 0;
}
}
}
Task.WaitAll(tasks.ToArray());
}
public void BQStreamInsert(List<TableDataInsertAllRequest.RowsData> rows)
{
string projectId= "ws-2015-logs";
string datasetId = "CompanyGroup1";
string tableId ="RawLogsTest7";
var service = new BigqueryService(new BaseClientService.Initializer()
{
HttpClientInitializer = credential,
ApplicationName = "BQ test"
});
try
{
var content = new TableDataInsertAllRequest();
content.Rows = rows;
content.Kind = "bigquery#tableDataInsertAllRequest";
content.IgnoreUnknownValues = true;
content.SkipInvalidRows = true;
var insertTask = service.Tabledata.InsertAll(content, projectId, datasetId, tableId);
TableDataInsertAllResponse response = insertTask.Execute();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
在program.cs中:
class Program
{
static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
StreamDatacs sd = new StreamDatacs();
sd.ReadData(@"C:15071600.csv");
Console.WriteLine(sw.ElapsedMilliseconds);
sw.Stop();
Console.Read();
}
在代码中,我逐行读取文件,每 20,000 行停止并插入 BQ,直到所有行都已 inserted.I 尝试了不同的缓冲区大小(500 行、1000 行、50,000线)在这里,但没有看到很大的区别。
感谢您的任何建议。非常感谢。
由于流式传输的有效负载大小有限,请参阅 Quota policy 谈论时间更容易,因为有效负载对我们双方的限制方式相同,但我也会提到其他副作用。
您需要正确实施限制:
- 最大行大小:1 MB
- HTTP 请求大小限制:10 MB
- 每个请求的最大行数:500
否则会报错。因此它被称为流式插入,因此您可以 运行 很多并行进程而不是一项大工作。
我们测量每个流媒体请求的时间在 1200-2500 毫秒之间,这在上个月是一致的,如您在图表中所见。
虽然我们看到了一些副作用:
- 请求随机失败,类型为 'Backend error'
- 请求随机失败,类型为 'Connection error'
- 请求随机失败,类型为 'timeout'(注意这里,因为只有一些行失败,而不是整个负载)
- 其他一些错误消息是非描述性的,它们非常模糊,对您没有帮助,请重试。
- 我们每天都会看到数百起此类故障,因此它们几乎是常态,与云运行状况无关。
对于所有这些,我们在付费 Google 企业支持中打开了案例,但不幸的是他们没有解决它。推荐的选项是带重试的指数退避,即使支持人员被告知这样做。就我个人而言,这并不能让我开心。
如果您选择的方法需要数小时,这意味着 it does not scale
,并且无法扩展。您需要重新考虑使用 async processes
的方法。为了尽快完成,你需要 运行 并行多个 worker,streaming 性能将是相同的。只要有 10 个 worker 并行,就意味着时间会减少 10 倍。
在后台 IO 绑定或 cpu 绑定任务中处理现在是大多数 Web 应用程序中的常见做法。有很多软件可以帮助构建后台作业,其中一些基于 Beanstalkd.
等消息系统
基本上,您需要在封闭的网络中分配插入作业,确定它们的优先级,然后使用 (运行) 它们。嗯,这正是 Beanstalkd 提供的。
Beanstalkd 提供了在管中组织作业的可能性,每个管对应一种作业类型。
您需要一个 API/producer 可以将作业放在管子上,假设是行的 json 表示。这是我们用例的杀手级功能。所以我们有一个 API 获取行,并将它们放在管子上,这只需要几毫秒,因此您可以实现快速响应时间。
另一方面,你现在在一些管子上有很多工作。你需要一个代理人。一个agent/consumer可以预约工作
它还可以帮助您进行作业管理和重试:成功处理作业后,消费者可以从管中删除该作业。在失败的情况下,消费者可以埋头工作。此作业不会被推回管中,但可以供进一步检查。
一个消费者可以释放一个作业,Beanstalkd 会把这个作业推回管中,让另一个客户端可以使用它。
可以在大多数常用语言中找到 Beanstalkd 客户端,web interface 可用于调试。
我正在使用 C# .Net 客户端库将插入数据从本地文件流式传输到 Bigquery database.I 使用我的代码成功插入所有数据,但是,性能(插入速度)非常慢。我的加载速度为 20KB/秒。例如,我有一个大小为 323MB 的文件,程序需要大约 20 分钟才能 load.I 测试我的带宽,上传速度大约 3MB/秒 我想知道是否有提高性能的方法?(我想它应该比那快得多)
这是我的代码:
namespace BigQuery_Test
{
class StreamDatacs
{
private static ServiceAccountCredential credential;
public StreamDatacs()
{
Credential2().Wait();
}
public async Task Credential2()
{
string certificateFile = "C:\xxxxxx.p12";
string serviceAccountEmail = "xxxxxxxxxxxx@developer.gserviceaccount.com";
var certificate = new X509Certificate2(certificateFile, "notasecret", X509KeyStorageFlags.Exportable);
credential = new ServiceAccountCredential(
new ServiceAccountCredential.Initializer(serviceAccountEmail)
{
Scopes = new[] { StorageService.Scope.DevstorageReadWrite,
BigqueryService.Scope.Bigquery }
}.FromCertificate(certificate));
Console.WriteLine(credential.Token);
Console.WriteLine(credential.TokenServerUrl);
}
public void ReadData(string path)
{
var logs = new List<TableDataInsertAllRequest.RowsData>();
using (StreamReader sr = new StreamReader(path))
{
string line = "";
string [] aLog = new string[1000];
Log Loga = new Log();
int count = 0;
int i = 0;
while((line = sr.ReadLine()) != null)
{
var theLog = new TableDataInsertAllRequest.RowsData();
string[] token = line.Split('\t');
theLog.Json = new Dictionary<string, object>();
theLog.Json.Add("Timestamp", token[0]);
theLog.Json.Add("ClientIpAddress", token[1]);
theLog.Json.Add("Username", token[2]);
theLog.Json.Add("GroupID", token[3]);
theLog.Json.Add("CompanyID", token[4]);
theLog.Json.Add("FullOrSiteLogging", token[5]);
theLog.Json.Add("PolicyFlags", token[6]);
theLog.Json.Add("ActionsTaken", token[7]);
theLog.Json.Add("ResponseStatus", token[8]);
logs.Add(theLog);
count++;
if (count > 20000)
{
BQStreamInsert(logs);
logs.Clear();
count = 0;
}
}
}
Task.WaitAll(tasks.ToArray());
}
public void BQStreamInsert(List<TableDataInsertAllRequest.RowsData> rows)
{
string projectId= "ws-2015-logs";
string datasetId = "CompanyGroup1";
string tableId ="RawLogsTest7";
var service = new BigqueryService(new BaseClientService.Initializer()
{
HttpClientInitializer = credential,
ApplicationName = "BQ test"
});
try
{
var content = new TableDataInsertAllRequest();
content.Rows = rows;
content.Kind = "bigquery#tableDataInsertAllRequest";
content.IgnoreUnknownValues = true;
content.SkipInvalidRows = true;
var insertTask = service.Tabledata.InsertAll(content, projectId, datasetId, tableId);
TableDataInsertAllResponse response = insertTask.Execute();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
在program.cs中:
class Program
{
static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
StreamDatacs sd = new StreamDatacs();
sd.ReadData(@"C:15071600.csv");
Console.WriteLine(sw.ElapsedMilliseconds);
sw.Stop();
Console.Read();
}
在代码中,我逐行读取文件,每 20,000 行停止并插入 BQ,直到所有行都已 inserted.I 尝试了不同的缓冲区大小(500 行、1000 行、50,000线)在这里,但没有看到很大的区别。 感谢您的任何建议。非常感谢。
由于流式传输的有效负载大小有限,请参阅 Quota policy 谈论时间更容易,因为有效负载对我们双方的限制方式相同,但我也会提到其他副作用。
您需要正确实施限制:
- 最大行大小:1 MB
- HTTP 请求大小限制:10 MB
- 每个请求的最大行数:500
否则会报错。因此它被称为流式插入,因此您可以 运行 很多并行进程而不是一项大工作。
我们测量每个流媒体请求的时间在 1200-2500 毫秒之间,这在上个月是一致的,如您在图表中所见。
虽然我们看到了一些副作用:
- 请求随机失败,类型为 'Backend error'
- 请求随机失败,类型为 'Connection error'
- 请求随机失败,类型为 'timeout'(注意这里,因为只有一些行失败,而不是整个负载)
- 其他一些错误消息是非描述性的,它们非常模糊,对您没有帮助,请重试。
- 我们每天都会看到数百起此类故障,因此它们几乎是常态,与云运行状况无关。
对于所有这些,我们在付费 Google 企业支持中打开了案例,但不幸的是他们没有解决它。推荐的选项是带重试的指数退避,即使支持人员被告知这样做。就我个人而言,这并不能让我开心。
如果您选择的方法需要数小时,这意味着 it does not scale
,并且无法扩展。您需要重新考虑使用 async processes
的方法。为了尽快完成,你需要 运行 并行多个 worker,streaming 性能将是相同的。只要有 10 个 worker 并行,就意味着时间会减少 10 倍。
在后台 IO 绑定或 cpu 绑定任务中处理现在是大多数 Web 应用程序中的常见做法。有很多软件可以帮助构建后台作业,其中一些基于 Beanstalkd.
等消息系统基本上,您需要在封闭的网络中分配插入作业,确定它们的优先级,然后使用 (运行) 它们。嗯,这正是 Beanstalkd 提供的。
Beanstalkd 提供了在管中组织作业的可能性,每个管对应一种作业类型。
您需要一个 API/producer 可以将作业放在管子上,假设是行的 json 表示。这是我们用例的杀手级功能。所以我们有一个 API 获取行,并将它们放在管子上,这只需要几毫秒,因此您可以实现快速响应时间。
另一方面,你现在在一些管子上有很多工作。你需要一个代理人。一个agent/consumer可以预约工作
它还可以帮助您进行作业管理和重试:成功处理作业后,消费者可以从管中删除该作业。在失败的情况下,消费者可以埋头工作。此作业不会被推回管中,但可以供进一步检查。
一个消费者可以释放一个作业,Beanstalkd 会把这个作业推回管中,让另一个客户端可以使用它。
可以在大多数常用语言中找到 Beanstalkd 客户端,web interface 可用于调试。