将数据从 bigquery 写入 csv 很慢
Writing data from bigquery to csv is slow
我写的代码表现得很奇怪而且很慢,我不明白为什么。
我想要做的是将数据从 bigquery(使用查询作为输入)下载到 CSV 文件,然后使用此 CSV 创建 url link,以便人们可以将其作为报告下载.
我正在尝试优化编写 CSV 的过程,因为它需要一些时间并且有一些奇怪的行为。
代码遍历 bigquery 结果并将每个结果传递给一个通道,以便将来 parsing/writing 使用 golang encoding/csv
包。
这是一些调试的相关部分
func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
it, err := s.bigqueryClient.Read(ctx, query)
if err != nil {
return err
}
filename := generateReportFilename(reportName)
gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
wc := gcsObj.NewWriter(ctx)
wc.ContentType = "text/csv"
wc.ContentDisposition = "attachment"
csvWriter := csv.NewWriter(wc)
var doneCount uint64
go backgroundTimer(ctx, it.TotalRows, &doneCount)
rowJobs := make(chan []bigquery.Value, it.TotalRows)
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
// start wrokers pool
for i := 0; i < workers; i++ {
go func(c context.Context, num int) {
defer wg.Done()
for row := range rowJobs {
records := make([]string, len(row))
for j, r := range records {
records[j] = fmt.Sprintf("%v", r)
}
s.mu.Lock()
start := time.Now()
if err := csvWriter.Write(records); err != {
log.Errorf("Error writing row: %v", err)
}
if time.Since(start) > time.Second {
fmt.Printf("worker %d took %v\n", num, time.Since(start))
}
s.mu.Unlock()
atomic.AddUint64(&doneCount, 1)
}
}(ctx, i)
}
// read results from bigquery and add to the pool
for {
var row []bigquery.Value
if err := it.Next(&row); err != nil {
if err == iterator.Done || err == context.DeadlineExceeded {
break
}
log.Errorf("Error loading next row from BQ: %v", err)
}
rowJobs <- row
}
fmt.Println("***done loop!***")
close(rowJobs)
wg.Wait()
csvWriter.Flush()
wc.Close()
url := fmt.Sprintf("%s/%s/%s", s.config.BaseURL s.config.GcsBucket, filename)
/// ....
}
func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case _ = <-ticker.C:
fmt.Printf("progress (%d,%d)\n", atomic.LoadUint64(done), total)
}
}
}()
}
bigquery 读取函数
func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error) {
job, err := c.bigqueryClient.Query(query).Run(ctx)
if err != nil {
return nil, err
}
it, err := job.Read(ctx)
if err != nil {
return nil, err
}
return it, nil
}
我 运行 此代码包含大约 400,000 行的查询。查询本身大约需要 10 秒,但整个过程大约需要 2 分钟
输出:
progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)
你可以看到写第一个 112346 行很快,然后由于某种原因 worker 3 花了 1.16 分钟(!!!)来写一行,这导致其他 worker 等待互斥锁被释放,而且这种情况又发生了2次,导致整个过程需要2分多钟才能完成。
我不确定发生了什么,我该如何进一步调试它,为什么我在执行过程中出现了这个停顿?
按照@serge-v的建议,您可以将所有记录写入本地文件,然后将文件作为一个整体传输到GCS。为了使该过程在更短的时间内发生,您可以将文件分成多个块并可以使用此命令:gsutil -m cp -j where
gsutil 用于从命令行访问云存储
-m用于执行并行multi-threaded/multi-processing复制
cp用于拷贝文件
-j 将 gzip 传输编码应用于任何文件上传。这也节省了网络带宽,同时在 Cloud Storage 中保留未压缩的数据。
要在你的go程序中应用这个命令你可以参考这个Github link.
您可以尝试在您的 Go 程序中实现 profiling。概要分析将帮助您分析复杂性。也可以通过profiling找到程序中的耗时
由于您正在从 BigQuery 读取数百万行,因此您可以尝试使用 BigQuery Storage API。它提供比批量数据导出更快的 BigQuery-managed 存储访问。使用 BigQuery 存储 API 而不是您在 Go 程序中使用的迭代器可以使过程更快。
如需更多参考,您还可以查看 BigQuery 提供的 Query Optimization techniques。
我写的代码表现得很奇怪而且很慢,我不明白为什么。 我想要做的是将数据从 bigquery(使用查询作为输入)下载到 CSV 文件,然后使用此 CSV 创建 url link,以便人们可以将其作为报告下载. 我正在尝试优化编写 CSV 的过程,因为它需要一些时间并且有一些奇怪的行为。
代码遍历 bigquery 结果并将每个结果传递给一个通道,以便将来 parsing/writing 使用 golang encoding/csv
包。
这是一些调试的相关部分
func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
it, err := s.bigqueryClient.Read(ctx, query)
if err != nil {
return err
}
filename := generateReportFilename(reportName)
gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
wc := gcsObj.NewWriter(ctx)
wc.ContentType = "text/csv"
wc.ContentDisposition = "attachment"
csvWriter := csv.NewWriter(wc)
var doneCount uint64
go backgroundTimer(ctx, it.TotalRows, &doneCount)
rowJobs := make(chan []bigquery.Value, it.TotalRows)
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
// start wrokers pool
for i := 0; i < workers; i++ {
go func(c context.Context, num int) {
defer wg.Done()
for row := range rowJobs {
records := make([]string, len(row))
for j, r := range records {
records[j] = fmt.Sprintf("%v", r)
}
s.mu.Lock()
start := time.Now()
if err := csvWriter.Write(records); err != {
log.Errorf("Error writing row: %v", err)
}
if time.Since(start) > time.Second {
fmt.Printf("worker %d took %v\n", num, time.Since(start))
}
s.mu.Unlock()
atomic.AddUint64(&doneCount, 1)
}
}(ctx, i)
}
// read results from bigquery and add to the pool
for {
var row []bigquery.Value
if err := it.Next(&row); err != nil {
if err == iterator.Done || err == context.DeadlineExceeded {
break
}
log.Errorf("Error loading next row from BQ: %v", err)
}
rowJobs <- row
}
fmt.Println("***done loop!***")
close(rowJobs)
wg.Wait()
csvWriter.Flush()
wc.Close()
url := fmt.Sprintf("%s/%s/%s", s.config.BaseURL s.config.GcsBucket, filename)
/// ....
}
func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case _ = <-ticker.C:
fmt.Printf("progress (%d,%d)\n", atomic.LoadUint64(done), total)
}
}
}()
}
bigquery 读取函数
func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error) {
job, err := c.bigqueryClient.Query(query).Run(ctx)
if err != nil {
return nil, err
}
it, err := job.Read(ctx)
if err != nil {
return nil, err
}
return it, nil
}
我 运行 此代码包含大约 400,000 行的查询。查询本身大约需要 10 秒,但整个过程大约需要 2 分钟 输出:
progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)
你可以看到写第一个 112346 行很快,然后由于某种原因 worker 3 花了 1.16 分钟(!!!)来写一行,这导致其他 worker 等待互斥锁被释放,而且这种情况又发生了2次,导致整个过程需要2分多钟才能完成。
我不确定发生了什么,我该如何进一步调试它,为什么我在执行过程中出现了这个停顿?
按照@serge-v的建议,您可以将所有记录写入本地文件,然后将文件作为一个整体传输到GCS。为了使该过程在更短的时间内发生,您可以将文件分成多个块并可以使用此命令:gsutil -m cp -j where
gsutil 用于从命令行访问云存储
-m用于执行并行multi-threaded/multi-processing复制
cp用于拷贝文件
-j 将 gzip 传输编码应用于任何文件上传。这也节省了网络带宽,同时在 Cloud Storage 中保留未压缩的数据。
要在你的go程序中应用这个命令你可以参考这个Github link.
您可以尝试在您的 Go 程序中实现 profiling。概要分析将帮助您分析复杂性。也可以通过profiling找到程序中的耗时
由于您正在从 BigQuery 读取数百万行,因此您可以尝试使用 BigQuery Storage API。它提供比批量数据导出更快的 BigQuery-managed 存储访问。使用 BigQuery 存储 API 而不是您在 Go 程序中使用的迭代器可以使过程更快。
如需更多参考,您还可以查看 BigQuery 提供的 Query Optimization techniques。