不使用 Google 云存储将 BigQuery 数据导出到 CSV
Export BigQuery Data to CSV without using Google Cloud Storage
我目前正在写一个软件,用于导出大量的BigQuery数据,并将查询结果存储在本地为CSV文件。我用的是Python3,客户端是google提供的。我进行了配置和身份验证,但问题是我无法在本地存储数据。每次执行时,我都会收到以下 错误消息 :
googleapiclient.errors.HttpError: https://www.googleapis.com/bigquery/v2/projects/round-office-769/jobs?alt=json returned "Invalid extract destination URI 'response/file-name-*.csv'. Must be a valid Google Storage path.">
这是我的工作配置:
def export_table(service, cloud_storage_path,
projectId, datasetId, tableId, sqlQuery,
export_format="CSV",
num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
'jobReference': {
'projectId': projectId,
'jobId': str(uuid.uuid4())
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': projectId,
'datasetId': datasetId,
'tableId': tableId,
},
'destinationUris': ['response/file-name-*.csv'],
'destinationFormat': export_format
},
'query': {
'query': sqlQuery,
}
}
}
return service.jobs().insert(
projectId=projectId,
body=job_data).execute(num_retries=num_retries)
我希望我可以只使用本地路径而不是云存储来存储数据,但我错了。
所以我的问题是:
我可以将查询的数据下载到本地(或本地数据库)还是必须使用 Google 云存储?
您需要使用 Google 云存储进行导出作业。解释了从 BigQuery 导出数据 here,还检查了不同路径语法的变体。
然后您可以将文件从 GCS 下载到您的本地存储。
Gsutil 工具可以帮助您进一步将文件从 GCS 下载到本地机器。
本地不能一键下载,需要先导出到GCS,再传到本地
您可以 运行 对该 table 执行 tabledata.list() 操作并设置 "alt=csv" 这将 return [=15= 的开头] 作为 CSV。
您可以使用分页机制直接下载所有数据(无需通过 Google 云存储路由)。基本上你需要为每个页面生成一个页面令牌,下载页面中的数据并迭代它直到所有数据都已下载,即没有更多令牌可用。这是 Java 中的示例代码,希望能阐明这个想法:
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.*;
/* your class starts here */
private String projectId = ""; /* fill in the project id here */
private String query = ""; /* enter your query here */
private Bigquery bigQuery;
private Job insert;
private TableDataList tableDataList;
private Iterator<TableRow> rowsIterator;
private List<TableRow> rows;
private long maxResults = 100000L; /* max number of rows in a page */
/* run query */
public void open() throws Exception {
HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
if (credential.createScopedRequired())
credential = credential.createScoped(BigqueryScopes.all());
bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
Job job = new Job().setConfiguration(jobConfig);
insert = bigQuery.jobs().insert(projectId, job).execute();
JobReference jobReference = insert.getJobReference();
while (true) {
Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
String state = poll.getStatus().getState();
if ("DONE".equals(state)) {
ErrorProto errorResult = poll.getStatus().getErrorResult();
if (errorResult != null)
throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
break;
}
Thread.sleep(10000);
}
tableDataList = getPage();
rows = tableDataList.getRows();
rowsIterator = rows != null ? rows.iterator() : null;
}
/* read data row by row */
public /* your data object here */ read() throws Exception {
if (rowsIterator == null) return null;
if (!rowsIterator.hasNext()) {
String pageToken = tableDataList.getPageToken();
if (pageToken == null) return null;
tableDataList = getPage(pageToken);
rows = tableDataList.getRows();
if (rows == null) return null;
rowsIterator = rows.iterator();
}
TableRow row = rowsIterator.next();
for (TableCell cell : row.getF()) {
Object value = cell.getV();
/* extract the data here */
}
/* return the data */
}
private TableDataList getPage() throws IOException {
return getPage(null);
}
private TableDataList getPage(String pageToken) throws IOException {
TableReference sourceTable = insert
.getConfiguration()
.getQuery()
.getDestinationTable();
if (sourceTable == null)
throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
return bigQuery.tabledata()
.list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
.setPageToken(pageToken)
.setMaxResults(maxResults)
.execute();
}
另一种方法是使用 UI,查询结果返回后,您可以 select 单击 "Download as CSV" 按钮。
如果您安装 Google BigQuery API 和 pandas 以及 pandas.io,您可以 运行 Python 在 Jupyter notebook 中,查询 BQ Table,并将数据放入本地数据帧。从那里,您可以将其写成 CSV。
正如Mikhail Berlyant所说,
BigQuery does not provide ability to directly export/download query
result to GCS or Local File.
您仍然可以使用 Web 将其导出 UI,只需三个步骤
- 配置查询以将结果保存在 BigQuery table 和 运行 中。
- 将 table 导出到 GCS 中的存储桶。
- 从存储桶中下载。
为确保成本保持在较低水平,只需确保在将内容导出到 GCS 后删除 table 并在将文件下载到您的存储桶后删除存储桶中的内容机.
步骤 1
在 BigQuery 屏幕中,运行查询前转到更多 > 查询设置
这将打开以下内容
这里有你想要的
- 目的地:为查询结果设置一个目的地table
- 项目名称:select项目。
- 数据集名称:select一个数据集。如果您没有,请创建它并返回。
- Table name: 任意命名(只能包含字母、数字或下划线)。
- 结果大小:允许较大的结果(无大小限制)。
然后保存它,查询被配置为保存在特定的table。现在您可以 运行 查询。
第 2 步
要将其导出到 GCP,您必须转到 table 并单击“导出”>“导出到 GCS”。
这将打开以下屏幕
在 Select GCS 位置 中定义存储桶、文件夹和文件。
例如,您有一个名为 daria_bucket 的存储桶(只能使用小写字母、数字、连字符 (-) 和下划线 (_ ). 点 (.) 可用于形成有效的域名。) 并希望将文件保存在存储桶的根目录中,名称为 test,然后你写(在 Select GCS 位置)
daria_bucket/test.csv
如果文件太大(超过 1 GB),您将收到错误消息。要修复它,您必须使用通配符将其保存在更多文件中。所以,你需要添加*,就像那样
daria_bucket/test*.csv
这将在存储桶 daria_bucket 内存储从 table 中提取的所有数据,这些数据会存储在多个名为 test000000000000、test000000000001、test000000000002、... testX 的文件中。
步骤 3
然后转到存储,您会看到存储桶。
进入其中,您会找到一个(或多个)文件。然后您可以从那里下载。
使用 Python 从 BigQuery table 将数据导出到 CSV 文件 pandas:
import pandas as pd
from google.cloud import bigquery
selectQuery = """SELECT * FROM dataset-name.table-name"""
bigqueryClient = bigquery.Client()
df = bigqueryClient.query(selectQuery).to_dataframe()
df.to_csv("file-name.csv", index=False)
也许您可以使用Google提供的simba odbc驱动程序,并使用任何提供odbc连接的工具来创建csv。它甚至可以是 microsoft ssis,你甚至不需要编码。
我目前正在写一个软件,用于导出大量的BigQuery数据,并将查询结果存储在本地为CSV文件。我用的是Python3,客户端是google提供的。我进行了配置和身份验证,但问题是我无法在本地存储数据。每次执行时,我都会收到以下 错误消息 :
googleapiclient.errors.HttpError: https://www.googleapis.com/bigquery/v2/projects/round-office-769/jobs?alt=json returned "Invalid extract destination URI 'response/file-name-*.csv'. Must be a valid Google Storage path.">
这是我的工作配置:
def export_table(service, cloud_storage_path,
projectId, datasetId, tableId, sqlQuery,
export_format="CSV",
num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
'jobReference': {
'projectId': projectId,
'jobId': str(uuid.uuid4())
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': projectId,
'datasetId': datasetId,
'tableId': tableId,
},
'destinationUris': ['response/file-name-*.csv'],
'destinationFormat': export_format
},
'query': {
'query': sqlQuery,
}
}
}
return service.jobs().insert(
projectId=projectId,
body=job_data).execute(num_retries=num_retries)
我希望我可以只使用本地路径而不是云存储来存储数据,但我错了。
所以我的问题是:
我可以将查询的数据下载到本地(或本地数据库)还是必须使用 Google 云存储?
您需要使用 Google 云存储进行导出作业。解释了从 BigQuery 导出数据 here,还检查了不同路径语法的变体。
然后您可以将文件从 GCS 下载到您的本地存储。
Gsutil 工具可以帮助您进一步将文件从 GCS 下载到本地机器。
本地不能一键下载,需要先导出到GCS,再传到本地
您可以 运行 对该 table 执行 tabledata.list() 操作并设置 "alt=csv" 这将 return [=15= 的开头] 作为 CSV。
您可以使用分页机制直接下载所有数据(无需通过 Google 云存储路由)。基本上你需要为每个页面生成一个页面令牌,下载页面中的数据并迭代它直到所有数据都已下载,即没有更多令牌可用。这是 Java 中的示例代码,希望能阐明这个想法:
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.*;
/* your class starts here */
private String projectId = ""; /* fill in the project id here */
private String query = ""; /* enter your query here */
private Bigquery bigQuery;
private Job insert;
private TableDataList tableDataList;
private Iterator<TableRow> rowsIterator;
private List<TableRow> rows;
private long maxResults = 100000L; /* max number of rows in a page */
/* run query */
public void open() throws Exception {
HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
if (credential.createScopedRequired())
credential = credential.createScoped(BigqueryScopes.all());
bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
Job job = new Job().setConfiguration(jobConfig);
insert = bigQuery.jobs().insert(projectId, job).execute();
JobReference jobReference = insert.getJobReference();
while (true) {
Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
String state = poll.getStatus().getState();
if ("DONE".equals(state)) {
ErrorProto errorResult = poll.getStatus().getErrorResult();
if (errorResult != null)
throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
break;
}
Thread.sleep(10000);
}
tableDataList = getPage();
rows = tableDataList.getRows();
rowsIterator = rows != null ? rows.iterator() : null;
}
/* read data row by row */
public /* your data object here */ read() throws Exception {
if (rowsIterator == null) return null;
if (!rowsIterator.hasNext()) {
String pageToken = tableDataList.getPageToken();
if (pageToken == null) return null;
tableDataList = getPage(pageToken);
rows = tableDataList.getRows();
if (rows == null) return null;
rowsIterator = rows.iterator();
}
TableRow row = rowsIterator.next();
for (TableCell cell : row.getF()) {
Object value = cell.getV();
/* extract the data here */
}
/* return the data */
}
private TableDataList getPage() throws IOException {
return getPage(null);
}
private TableDataList getPage(String pageToken) throws IOException {
TableReference sourceTable = insert
.getConfiguration()
.getQuery()
.getDestinationTable();
if (sourceTable == null)
throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
return bigQuery.tabledata()
.list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
.setPageToken(pageToken)
.setMaxResults(maxResults)
.execute();
}
另一种方法是使用 UI,查询结果返回后,您可以 select 单击 "Download as CSV" 按钮。
如果您安装 Google BigQuery API 和 pandas 以及 pandas.io,您可以 运行 Python 在 Jupyter notebook 中,查询 BQ Table,并将数据放入本地数据帧。从那里,您可以将其写成 CSV。
正如Mikhail Berlyant所说,
BigQuery does not provide ability to directly export/download query result to GCS or Local File.
您仍然可以使用 Web 将其导出 UI,只需三个步骤
- 配置查询以将结果保存在 BigQuery table 和 运行 中。
- 将 table 导出到 GCS 中的存储桶。
- 从存储桶中下载。
为确保成本保持在较低水平,只需确保在将内容导出到 GCS 后删除 table 并在将文件下载到您的存储桶后删除存储桶中的内容机.
步骤 1
在 BigQuery 屏幕中,运行查询前转到更多 > 查询设置
这将打开以下内容
这里有你想要的
- 目的地:为查询结果设置一个目的地table
- 项目名称:select项目。
- 数据集名称:select一个数据集。如果您没有,请创建它并返回。
- Table name: 任意命名(只能包含字母、数字或下划线)。
- 结果大小:允许较大的结果(无大小限制)。
然后保存它,查询被配置为保存在特定的table。现在您可以 运行 查询。
第 2 步
要将其导出到 GCP,您必须转到 table 并单击“导出”>“导出到 GCS”。
这将打开以下屏幕
在 Select GCS 位置 中定义存储桶、文件夹和文件。
例如,您有一个名为 daria_bucket 的存储桶(只能使用小写字母、数字、连字符 (-) 和下划线 (_ ). 点 (.) 可用于形成有效的域名。) 并希望将文件保存在存储桶的根目录中,名称为 test,然后你写(在 Select GCS 位置)
daria_bucket/test.csv
如果文件太大(超过 1 GB),您将收到错误消息。要修复它,您必须使用通配符将其保存在更多文件中。所以,你需要添加*,就像那样
daria_bucket/test*.csv
这将在存储桶 daria_bucket 内存储从 table 中提取的所有数据,这些数据会存储在多个名为 test000000000000、test000000000001、test000000000002、... testX 的文件中。
步骤 3
然后转到存储,您会看到存储桶。
进入其中,您会找到一个(或多个)文件。然后您可以从那里下载。
使用 Python 从 BigQuery table 将数据导出到 CSV 文件 pandas:
import pandas as pd
from google.cloud import bigquery
selectQuery = """SELECT * FROM dataset-name.table-name"""
bigqueryClient = bigquery.Client()
df = bigqueryClient.query(selectQuery).to_dataframe()
df.to_csv("file-name.csv", index=False)
也许您可以使用Google提供的simba odbc驱动程序,并使用任何提供odbc连接的工具来创建csv。它甚至可以是 microsoft ssis,你甚至不需要编码。