Google BigQuery 如何向 BigQuery 结果动态添加列
Google BigQuery How to dynamically Add Column to Bigquery Result
我有一个应用程序可以查询我们的 BQ 数据集并将结果存储到 BQ 表中:
我的代码:
BigQuery bigquery = bigQuery();
TableId destinationTable = TableId.of(datasetName, TableName);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setDestinationTable(destinationTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.build();
TableResult results = bigquery.query(queryConfig);
在将结果写入 BQ 数据集时,我想将一列附加到每一行,类似于这样:
queryConfig.addNewColumnToEveryRow("ID", "123");
怎么做?
这应该可以将它添加到您的 query
字符串中。
String query = "SELECT yourOtherFields, 123 AS ID FROM yourSource";
有效的解决方案是更改查询本身,如@Brent 的解决方案所示。 @Mikhail 提到的另一个解决方案是 post-process 查询执行的返回结果。请参考以下代码片段,以编程方式 post-process(添加新列)并将数据加载到 BigQuery。
程序流程如下
- 执行查询并获取结果。
- 迭代结果并构造一个 JSON 数组。
- 将JSON数组以NDJSON格式写入本地文件。
- 通过创建 Batch load job (implemented below). You can also use the streaming API 加载数据,将本地文件加载到 BigQuery table。
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
public class AddNewColumn {
public static void main(String[] args) throws IOException {
runSimpleQuery();
}
public static void runSimpleQuery() throws IOException {
String query = "SELECT corpus, SUM(word_count) as word_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus ORDER BY word_count LIMIT 5;";
simpleQuery(query);
}
public static void simpleQuery(String query) throws IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Create the query job.
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
// Execute the query.
TableResult result = bigquery.query(queryConfig);
System.out.println("\nQuery ran successfully");
// Construct JSON array from the individual rows
ArrayList<String> columnNames = new ArrayList<String>();
result.getSchema().getFields().forEach(field -> columnNames.add(field.getName())); // get column names
JsonArray jsonArray = new JsonArray();
result.iterateAll().forEach(rows -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("ID", 123);
columnNames.forEach(
column -> {
jsonObject.addProperty(column, rows.get(column).getValue().toString());
}
);
jsonArray.add(jsonObject);
});
// Writing JSON array to a temporary file in NDJSON format
FileWriter file = new FileWriter("./tempfile.json");
jsonArray.forEach(jsonElement -> {
try {
file.write(jsonElement.toString());
file.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
});
file.close();
System.out.println("Data written to temporary file.");
// Create a load job to insert data
// TODO: Change the destination dataset and table information.
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
Path jsonPath = FileSystems.getDefault().getPath(".", "tempfile.json");
insertDataIntoDestinationTable(datasetName, tableName, jsonPath, FormatOptions.json());
} catch (BigQueryException | InterruptedException e) {
System.out.println("Query did not run \n" + e.toString());
}
}
private static void insertDataIntoDestinationTable(String datasetName, String tableName, Path jsonPath, FormatOptions formatOptions) throws InterruptedException, IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(datasetName, tableName);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();
// The location and JobName must be specified; other fields can be auto-detected.
String jobName = "jobId_" + UUID.randomUUID().toString();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
// Imports a local file into a table.
try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
Files.copy(jsonPath.toFile(), stream);
}
// Get the Job created by the TableDataWriteChannel and wait for it to complete.
Job job = bigquery.getJob(jobId);
Job completedJob = job.waitFor();
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to load local file to the table due to an error: \n"
+ job.getStatus().getError());
return;
}
} catch (BigQueryException e) {
System.out.println("Local file not loaded. \n" + e.toString());
}
}
}
输出:
查询结果已成功插入目标table。
我有一个应用程序可以查询我们的 BQ 数据集并将结果存储到 BQ 表中: 我的代码:
BigQuery bigquery = bigQuery();
TableId destinationTable = TableId.of(datasetName, TableName);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setDestinationTable(destinationTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.build();
TableResult results = bigquery.query(queryConfig);
在将结果写入 BQ 数据集时,我想将一列附加到每一行,类似于这样:
queryConfig.addNewColumnToEveryRow("ID", "123");
怎么做?
这应该可以将它添加到您的 query
字符串中。
String query = "SELECT yourOtherFields, 123 AS ID FROM yourSource";
有效的解决方案是更改查询本身,如@Brent 的解决方案所示。 @Mikhail 提到的另一个解决方案是 post-process 查询执行的返回结果。请参考以下代码片段,以编程方式 post-process(添加新列)并将数据加载到 BigQuery。
程序流程如下
- 执行查询并获取结果。
- 迭代结果并构造一个 JSON 数组。
- 将JSON数组以NDJSON格式写入本地文件。
- 通过创建 Batch load job (implemented below). You can also use the streaming API 加载数据,将本地文件加载到 BigQuery table。
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
public class AddNewColumn {
public static void main(String[] args) throws IOException {
runSimpleQuery();
}
public static void runSimpleQuery() throws IOException {
String query = "SELECT corpus, SUM(word_count) as word_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus ORDER BY word_count LIMIT 5;";
simpleQuery(query);
}
public static void simpleQuery(String query) throws IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Create the query job.
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
// Execute the query.
TableResult result = bigquery.query(queryConfig);
System.out.println("\nQuery ran successfully");
// Construct JSON array from the individual rows
ArrayList<String> columnNames = new ArrayList<String>();
result.getSchema().getFields().forEach(field -> columnNames.add(field.getName())); // get column names
JsonArray jsonArray = new JsonArray();
result.iterateAll().forEach(rows -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("ID", 123);
columnNames.forEach(
column -> {
jsonObject.addProperty(column, rows.get(column).getValue().toString());
}
);
jsonArray.add(jsonObject);
});
// Writing JSON array to a temporary file in NDJSON format
FileWriter file = new FileWriter("./tempfile.json");
jsonArray.forEach(jsonElement -> {
try {
file.write(jsonElement.toString());
file.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
});
file.close();
System.out.println("Data written to temporary file.");
// Create a load job to insert data
// TODO: Change the destination dataset and table information.
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
Path jsonPath = FileSystems.getDefault().getPath(".", "tempfile.json");
insertDataIntoDestinationTable(datasetName, tableName, jsonPath, FormatOptions.json());
} catch (BigQueryException | InterruptedException e) {
System.out.println("Query did not run \n" + e.toString());
}
}
private static void insertDataIntoDestinationTable(String datasetName, String tableName, Path jsonPath, FormatOptions formatOptions) throws InterruptedException, IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(datasetName, tableName);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();
// The location and JobName must be specified; other fields can be auto-detected.
String jobName = "jobId_" + UUID.randomUUID().toString();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
// Imports a local file into a table.
try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
Files.copy(jsonPath.toFile(), stream);
}
// Get the Job created by the TableDataWriteChannel and wait for it to complete.
Job job = bigquery.getJob(jobId);
Job completedJob = job.waitFor();
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to load local file to the table due to an error: \n"
+ job.getStatus().getError());
return;
}
} catch (BigQueryException e) {
System.out.println("Local file not loaded. \n" + e.toString());
}
}
}
输出:
查询结果已成功插入目标table。