Google BigQuery 如何向 BigQuery 结果动态添加列

Google BigQuery How to dynamically Add Column to Bigquery Result

我有一个应用程序可以查询我们的 BQ 数据集并将结果存储到 BQ 表中: 我的代码:

    BigQuery  bigquery = bigQuery();
    TableId destinationTable = TableId.of(datasetName, TableName);
    
    QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
                    .setDestinationTable(destinationTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
                 .build();
 TableResult results = bigquery.query(queryConfig); 

在将结果写入 BQ 数据集时,我想将一列附加到每一行,类似于这样:

queryConfig.addNewColumnToEveryRow("ID", "123");

怎么做?

这应该可以将它添加到您的 query 字符串中。

String query = "SELECT yourOtherFields, 123 AS ID FROM yourSource";

有效的解决方案是更改查询本身,如@Brent 的解决方案所示。 @Mikhail 提到的另一个解决方案是 post-process 查询执行的返回结果。请参考以下代码片段,以编程方式 post-process(添加新列)并将数据加载到 BigQuery。

程序流程如下

  1. 执行查询并获取结果。
  2. 迭代结果并构造一个 JSON 数组。
  3. 将JSON数组以NDJSON格式写入本地文件。
  4. 通过创建 Batch load job (implemented below). You can also use the streaming API 加载数据,将本地文件加载到 BigQuery table。
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;


public class AddNewColumn {

    public static void main(String[] args) throws IOException {
        runSimpleQuery();
    }

    public static void runSimpleQuery() throws IOException {

        String query = "SELECT corpus, SUM(word_count) as word_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus ORDER BY word_count LIMIT 5;";
        simpleQuery(query);
  }

    public static void simpleQuery(String query) throws IOException {
        try {
            // Initialize client that will be used to send requests. This client only needs to be created
            // once, and can be reused for multiple requests.
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

            // Create the query job.
            QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

            // Execute the query.
            TableResult result = bigquery.query(queryConfig);
            System.out.println("\nQuery ran successfully");


            // Construct JSON array from the individual rows
            ArrayList<String> columnNames = new ArrayList<String>();
            result.getSchema().getFields().forEach(field ->  columnNames.add(field.getName())); // get column names

            JsonArray jsonArray = new JsonArray();

            result.iterateAll().forEach(rows -> {
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("ID", 123);
            columnNames.forEach(
                column -> {
                    jsonObject.addProperty(column, rows.get(column).getValue().toString());
                }
            );

            jsonArray.add(jsonObject);
            });
        

            // Writing JSON array to a temporary file in NDJSON format
            FileWriter file = new FileWriter("./tempfile.json");
            jsonArray.forEach(jsonElement -> {
                try {
                    file.write(jsonElement.toString());
                    file.write("\n");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            file.close();

            System.out.println("Data written to temporary file.");

            // Create a load job to insert data
            // TODO: Change the destination dataset and table information.
            String datasetName = "MY_DATASET_NAME";
            String tableName = "MY_TABLE_NAME";
            Path jsonPath = FileSystems.getDefault().getPath(".", "tempfile.json");
            insertDataIntoDestinationTable(datasetName, tableName, jsonPath, FormatOptions.json());

        } catch (BigQueryException | InterruptedException e) {
            System.out.println("Query did not run \n" + e.toString());
        }
    }

    private static void insertDataIntoDestinationTable(String datasetName, String tableName, Path jsonPath, FormatOptions formatOptions) throws InterruptedException, IOException {
        try {
            // Initialize client that will be used to send requests. This client only needs to be created
            // once, and can be reused for multiple requests.
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            TableId tableId = TableId.of(datasetName, tableName);

            WriteChannelConfiguration writeChannelConfiguration =
                WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();

            // The location and JobName must be specified; other fields can be auto-detected.
            String jobName = "jobId_" + UUID.randomUUID().toString();
            JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

            // Imports a local file into a table.
            try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
                OutputStream stream = Channels.newOutputStream(writer)) {
                Files.copy(jsonPath.toFile(), stream);
      }

            // Get the Job created by the TableDataWriteChannel and wait for it to complete.
            Job job = bigquery.getJob(jobId);
            Job completedJob = job.waitFor();
            if (completedJob == null) {
                System.out.println("Job not executed since it no longer exists.");
                return;
            } else if (completedJob.getStatus().getError() != null) {
                System.out.println(
                    "BigQuery was unable to load local file to the table due to an error: \n"
                        + job.getStatus().getError());
                return;
            }
            
        } catch (BigQueryException e) {
            System.out.println("Local file not loaded. \n" + e.toString());
        }


    }
}

输出: 查询结果已成功插入目标table。