如何在 Java 中正确迭代 Bigquery TableResult

How to properly iterate Bigquery TableResult in Java

我正在尝试使用 getValues() 迭代 TableResult 中的行,如下所示。 如果我使用 getValues(),它只会检索第一页的行。我想使用 getValues() 而不是 iterateAll() 来迭代所有行。 在下面的代码中,问题是它的时间是无限的。没有结束。 while(results.hasNextPage()) 没有结束。下面的代码有什么问题?

    {
    query = "select from aa.bb.cc";
    QueryJobConfiguration queryConfig =
            QueryJobConfiguration.newBuilder(query)
                    .setPriority(QueryJobConfiguration.Priority.BATCH)
                    .build();
    TableResult results = bigquery.query(queryConfig);

    int i = 0;
    int j=0;
    while(results.hasNextPage()) {
        j++;
        System.out.println("page " + j);
        System.out.println("Data Extracted::" + i + " records");
        for (FieldValueList row : results.getNextPage().getValues()) {
            i++;
        }
    }
    System.out.println("Total Count::" + results.getTotalRows());
    System.out.println("Data Extracted::" + i + " records");
}

我在源 table 中只有 200,000 条记录。下面是输出,我强行停止了这个过程。

page 1
Data Extracted::0 records
page 2
Data Extracted::85242 records
page 3
Data Extracted::170484 records
page 4
Data Extracted::255726 records
page 5
Data Extracted::340968 records
page 6
Data Extracted::426210 records
page 7
Data Extracted::511452 records
page 8
Data Extracted::596694 records
.......
.......
.......
.......

简而言之,您需要使用 getNextPage() 变量更新 TableResults 变量。如果您不更新它,您将始终一遍又一遍地循环相同的结果。这就是您在输出中获得大量记录的原因。

如果您检查以下样本:Bigquery Pagination and Using Java Client Library。我们可以通过多种方式处理分页结果。虽然不是特定于单个 运行 查询。

如下代码所示,部分基于分页示例,您需要使用 getNextPage() 的输出来更新 results 变量并继续执行 while 内的下一次迭代直到它遍历除最后一页以外的所有页面。

QueryRun.Java

package com.projects;

// [START bigquery_query]
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration; 
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
import java.util.UUID;

import sun.jvm.hotspot.debugger.Page;

public class QueryRun {

  public static void main(String[] args) {

    String projectId = "bigquery-public-data";
    String datasetName = "covid19_ecdc_eu";
    String tableName = "covid_19_geographic_distribution_worldwide";
    String query =
        "SELECT * "
            + " FROM `"
            + projectId
            + "."
            + datasetName
            + "."
            + tableName
            + "`"
            + " LIMIT 100";
    System.out.println(query);
    query(query);
  }

  public static void query(String query) {
    try {
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

      // Create a job ID so that we can safely retry.
      JobId jobId = JobId.of(UUID.randomUUID().toString());
      Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

      TableResult results = queryJob.getQueryResults(QueryResultsOption.pageSize(10));

      int i = 0;
      int j =0; 

      // get all paged data except last line
      while(results.hasNextPage()) {
        j++;   
        for (FieldValueList row : results.getValues()) { 
            i++;
        }
        results = results.getNextPage();
        print_msg(i,j);
      }

      // last line run
      j++;
      for (FieldValueList row : results.getValues()) {  
        i++;
      }
      print_msg(i,j);

      System.out.println("Query performed successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Query not performed \n" + e.toString());
    }
  }

  public static void print_msg(int i,int j)
  {
    System.out.println("page " + j);
    System.out.println("Data Extracted::" + i + " records"); 
  }

} 
// [END bigquery_query]

输出:

SELECT *  FROM `bigquery-public-data.covid19_ecdc_eu.covid_19_geographic_distribution_worldwide` LIMIT 100
page 1
Data Extracted::10 records
page 2
Data Extracted::20 records
page 3
Data Extracted::30 records
page 4
Data Extracted::40 records
page 5
Data Extracted::50 records
page 6
Data Extracted::60 records
page 7
Data Extracted::70 records
page 8
Data Extracted::80 records
page 9
Data Extracted::90 records
page 10
Data Extracted::100 records
Query performed successfully.

最后一点,没有关于查询分页的官方示例,所以我不完全确定使用 java 处理分页的推荐方法。它在 BigQuery for Java 文档页面上不是很清楚。如果您可以用您的分页方法更新您的问题,我将不胜感激。

如果您在 运行 附加示例中遇到问题,请参阅 Using the BigQuery Java client sample, its github 页面及其中的 pom.xml 文件,并检查您是否遵守它。