Google BigQuery returns 只有部分 table 数据与 C# 应用程序使用 .net 客户端库

Google BigQuery returns only partial table data with C# application using .net Client Library

我正在尝试执行查询(具有 10 个字段的基本 select 语句)。我的 table 包含超过 50 万行。 C# 应用程序 return 的响应只有 4260 行。然而 Web UI returns 的所有记录。

为什么我的代码return只有部分数据,select所有记录并加载到C#数据Table的最佳方法是什么?如果有任何代码片段,那将对我更有帮助。

using Google.Apis.Auth.OAuth2;
using System.IO;
using System.Threading;
using Google.Apis.Bigquery.v2;
using Google.Apis.Bigquery.v2.Data;
using System.Data;
using Google.Apis.Services;
using System;
using System.Security.Cryptography.X509Certificates;

namespace GoogleBigQuery
{
    public class Class1
    {
        private static void Main()
        {
            try
            {
                Console.WriteLine("Start Time: {0}", DateTime.Now.ToString());
                String serviceAccountEmail = "SERVICE ACCOUNT EMAIL";

                var certificate = new X509Certificate2(@"KeyFile.p12", "notasecret", X509KeyStorageFlags.Exportable);

                ServiceAccountCredential credential = new ServiceAccountCredential(
                   new ServiceAccountCredential.Initializer(serviceAccountEmail)
                   {
                       Scopes = new[] { BigqueryService.Scope.Bigquery, BigqueryService.Scope.BigqueryInsertdata, BigqueryService.Scope.CloudPlatform, BigqueryService.Scope.DevstorageFullControl }
                   }.FromCertificate(certificate));

                BigqueryService Service = new BigqueryService(new BaseClientService.Initializer()
                {
                    HttpClientInitializer = credential,
                    ApplicationName = "PROJECT NAME"
                });

                string query = "SELECT * FROM [publicdata:samples.shakespeare]";

                JobsResource j = Service.Jobs;

                QueryRequest qr = new QueryRequest();

                string ProjectID = "PROJECT ID";

                qr.Query = query;
                qr.MaxResults = Int32.MaxValue;
                qr.TimeoutMs = Int32.MaxValue;

                DataTable DT = new DataTable();
                int i = 0;

                QueryResponse response = j.Query(qr, ProjectID).Execute();

                string pageToken = null;

                if (response.JobComplete == true)
                {
                    if (response != null)
                    {
                        int colCount = response.Schema.Fields.Count;

                        if (DT == null)
                            DT = new DataTable();

                        if (DT.Columns.Count == 0)
                        {
                            foreach (var Column in response.Schema.Fields)
                            {
                                DT.Columns.Add(Column.Name);
                            }
                        }

                        pageToken = response.PageToken;

                        if (response.Rows != null)
                        {
                            foreach (TableRow row in response.Rows)
                            {
                                DataRow dr = DT.NewRow();

                                for (i = 0; i < colCount; i++)
                                {
                                    dr[i] = row.F[i].V;
                                }

                                DT.Rows.Add(dr);
                            }
                        }
                        Console.WriteLine("No of Records are Readed: {0} @ {1}", DT.Rows.Count.ToString(), DateTime.Now.ToString());

                        while (true)
                        {
                            int StartIndexForQuery = DT.Rows.Count;
                            Google.Apis.Bigquery.v2.JobsResource.GetQueryResultsRequest SubQR = Service.Jobs.GetQueryResults(response.JobReference.ProjectId, response.JobReference.JobId);
                            SubQR.StartIndex = (ulong)StartIndexForQuery;
                            //SubQR.MaxResults = Int32.MaxValue;
                            GetQueryResultsResponse QueryResultResponse = SubQR.Execute();

                            if (QueryResultResponse != null)
                            {
                                if (QueryResultResponse.Rows != null)
                                {
                                    foreach (TableRow row in QueryResultResponse.Rows)
                                    {
                                        DataRow dr = DT.NewRow();

                                        for (i = 0; i < colCount; i++)
                                        {
                                            dr[i] = row.F[i].V;
                                        }

                                        DT.Rows.Add(dr);
                                    }
                                }

                                Console.WriteLine("No of Records are Readed: {0} @ {1}", DT.Rows.Count.ToString(), DateTime.Now.ToString());

                                if (null == QueryResultResponse.PageToken)
                                {
                                    break;
                                }
                            }
                            else
                            {
                                break;
                            }
                        }
                    }
                    else
                    {
                        Console.WriteLine("Response is null");
                    }
                }


                int TotalCount = 0;

                if (DT != null && DT.Rows.Count > 0)
                {
                    TotalCount = DT.Rows.Count;
                }
                else
                {
                    TotalCount = 0;
                }

                Console.WriteLine("End Time: {0}", DateTime.Now.ToString());
                Console.WriteLine("No. of records readed from google bigquery service: " + TotalCount.ToString());
            }
            catch (Exception e)
            {
                Console.WriteLine("Error Occurred: " + e.Message);
            }

            Console.ReadLine();
        }
    }
}

在此示例查询中,从 public 数据集中获取结果,在 table 中包含 164656 行,但第一次仅响应 returns 85000 行,然后再次查询以得到第二组结果。 (但不知道这是获得所有结果的唯一解决方案)。

在这个示例中只包含 4 个字段,即使它没有 return 所有行,在我的例子中 table 包含超过 15 个字段,我得到了 ~4000 行的响应~10k 行,我需要一次又一次地查询以获得 selecting 1000 行的剩余结果,在我的方法中最多需要 2 分钟,所以我期望最好的方法是 select 中的所有记录单一回应。

Web UI 自动 flattens 数据。这意味着您会看到每个嵌套字段有多行。

当您通过 API 运行 相同的查询时,它不会被展平,并且您得到的行数会更少,因为嵌套字段作为对象返回。您应该检查一下您是否属于这种情况。

另一个是您确实需要对结果进行分页。 Paging through list results 对此进行了解释。

如果您只想做一项工作,那么您应该将查询输出写入 table,而不是将 table 导出为 JSON,然后从 GCS 下载导出.

用户回答#:Pentium10

无法一次性 运行 查询和 select 大响应。您可以对结果进行分页,或者如果您可以创建一个导出到文件的作业,则使用在您的应用程序中生成的文件。导出是免费的。

逐步 运行 大型查询并将结果导出到存储在 GCS 上的文件:

1) 在作业配置中将 allowLargeResults 设置为 true。您还必须使用 allowLargeResults 标志指定目的地 table。

示例:

"configuration": 
  {
    "query": 
    {
      "allowLargeResults": true,
      "query": "select uid from [project:dataset.table]"
      "destinationTable": [project:dataset.table]

    }
  }

2) 现在您的数据位于您设置的目的地 table 中。您需要创建一个新作业,并设置导出 属性 以便能够将 table 导出到文件。导出是免费的,但您需要 Google 激活云存储才能将生成的文件放在那里。

3) 最后从 GCS 下载大文件。

轮到我设计解决方案以获得更好的结果。

希望这可能对某人有所帮助。可以使用 PageToken 检索下一组分页结果。下面是如何使用 PageToken 的示例代码。虽然,我喜欢免费出口的想法。在这里,我将行写入平面文件,但您可以将它们添加到您的数据表中。显然,尽管将大型 DataTable 保留在内存中是一个坏主意。

    public void ExecuteSQL(BigqueryService bqservice, String ProjectID)
    {
        string sSql = "SELECT r.Dealname, r.poolnumber, r.loanid FROM [MBS_Dataset.tblRemitData] R left join each [MBS_Dataset.tblOrigData] o on R.Dealname = o.Dealname and R.Poolnumber = o.Poolnumber and R.LoanID = o.LoanID Order by o.Dealname, o.poolnumber, o.loanid limit 100000";

        QueryRequest _r = new QueryRequest();
        _r.Query = sSql;
        QueryResponse _qr = bqservice.Jobs.Query(_r, ProjectID).Execute();

        string pageToken = null;
        if (_qr.JobComplete != true)
        {
            //job not finished yet! expecting more data
            while (true)
            {
                var resultReq = bqservice.Jobs.GetQueryResults(_qr.JobReference.ProjectId, _qr.JobReference.JobId);
                resultReq.PageToken = pageToken;
                var result = resultReq.Execute();

                if (result.JobComplete == true)
                {
                    WriteRows(result.Rows, result.Schema.Fields);
                    pageToken = result.PageToken;
                    if (pageToken == null)
                        break;
                }
            }
        }
        else
        {
            List<string> _fieldNames = _qr.Schema.Fields.ToList().Select(x => x.Name).ToList();
            WriteRows(_qr.Rows, _qr.Schema.Fields);
        }
    }