将数据从 bigquery 写入 csv 很慢

Writing data from bigquery to csv is slow

我写的代码表现得很奇怪而且很慢,我不明白为什么。 我想要做的是将数据从 bigquery(使用查询作为输入)下载到 CSV 文件,然后使用此 CSV 创建 url link,以便人们可以将其作为报告下载. 我正在尝试优化编写 CSV 的过程,因为它需要一些时间并且有一些奇怪的行为。

代码遍历 bigquery 结果并将每个结果传递给一个通道,以便将来 parsing/writing 使用 golang encoding/csv 包。 这是一些调试的相关部分

func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
    it, err := s.bigqueryClient.Read(ctx, query)
    if err != nil {
        return err
    }
    filename := generateReportFilename(reportName)
    gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
    wc := gcsObj.NewWriter(ctx)
    wc.ContentType = "text/csv"
    wc.ContentDisposition = "attachment"

    csvWriter := csv.NewWriter(wc)

    var doneCount uint64

    go backgroundTimer(ctx, it.TotalRows, &doneCount)

    rowJobs := make(chan []bigquery.Value, it.TotalRows)
    workers := 10
    wg := sync.WaitGroup{}
    wg.Add(workers)

    // start wrokers pool
    for i := 0; i < workers; i++ {
        go func(c context.Context, num int) {
            defer wg.Done()
            for row := range rowJobs {
                records := make([]string, len(row))
                for j, r := range records {
                    records[j] = fmt.Sprintf("%v", r)
                }
                s.mu.Lock()
                start := time.Now()
                if err := csvWriter.Write(records); err != {
                    log.Errorf("Error writing row: %v", err)
                }
                if time.Since(start) > time.Second {
                    fmt.Printf("worker %d took %v\n", num, time.Since(start))
                }
                s.mu.Unlock()
                atomic.AddUint64(&doneCount, 1)
            }
        }(ctx, i)
    }

    // read results from bigquery and add to the pool
    for {
        var row []bigquery.Value
        if err := it.Next(&row); err != nil {
            if err == iterator.Done || err == context.DeadlineExceeded {
                break
            }
            log.Errorf("Error loading next row from BQ: %v", err)
        }
        rowJobs <- row
    }

    fmt.Println("***done loop!***")

    close(rowJobs)

    wg.Wait()

    csvWriter.Flush()
    wc.Close()

    url := fmt.Sprintf("%s/%s/%s", s.config.BaseURL s.config.GcsBucket, filename)

    /// ....

}

func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
    ticker := time.NewTicker(10 * time.Second)
    go func() {
        for {
            select {
            case <-ctx.Done():
                ticker.Stop()
                return
            case _ = <-ticker.C:
                fmt.Printf("progress (%d,%d)\n", atomic.LoadUint64(done), total)
            }
        }
    }()
}

bigquery 读取函数

func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error)  {
    job, err := c.bigqueryClient.Query(query).Run(ctx)
    if err != nil {
        return nil, err
    }
    it, err := job.Read(ctx)
    if err != nil {
        return nil, err
    }
    return it, nil
}

我 运行 此代码包含大约 400,000 行的查询。查询本身大约需要 10 秒,但整个过程大约需要 2 分钟 输出:

progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)

你可以看到写第一个 112346 行很快,然后由于某种原因 worker 3 花了 1.16 分钟(!!!)来写一行,这导致其他 worker 等待互斥锁被释放,而且这种情况又发生了2次,导致整个过程需要2分多钟才能完成。

我不确定发生了什么,我该如何进一步调试它,为什么我在执行过程中出现了这个停顿?

按照@serge-v的建议,您可以将所有记录写入本地文件,然后将文件作为一个整体传输到GCS。为了使该过程在更短的时间内发生,您可以将文件分成多个块并可以使用此命令:gsutil -m cp -j where

gsutil 用于从命令行访问云存储

-m用于执行并行multi-threaded/multi-processing复制

cp用于拷贝文件

-j 将 gzip 传输编码应用于任何文件上传。这也节省了网络带宽,同时在 Cloud Storage 中保留未压缩的数据。

要在你的go程序中应用这个命令你可以参考这个Github link.

您可以尝试在您的 Go 程序中实现 profiling。概要分析将帮助您分析复杂性。也可以通过profiling找到程序中的耗时

由于您正在从 BigQuery 读取数百万行,因此您可以尝试使用 BigQuery Storage API。它提供比批量数据导出更快的 BigQuery-managed 存储访问。使用 BigQuery 存储 API 而不是您在 Go 程序中使用的迭代器可以使过程更快。

如需更多参考,您还可以查看 BigQuery 提供的 Query Optimization techniques