如何使用 BigQuery api 将 spark 连接到 BigQuery
How to connect spark to BigQuery using BigQuery api
我是 gcloud 和 BigQuery 的新手,想使用 spark 从 BigQuery 读取数据。
我使用 Google APIs Client Library for Java. 并且能够连接到 BigQuery。
我得到 com.google.api.services.bigquery.Bigquery 对象并能够打印读取的数据集、tableId 和 tableData
我的问题是
我如何将这个 BigQuery 身份验证对象(凭据对象)连接到 spark 或者是否可以将这个对象与 hadoopApi 一起使用
如果没有可能,如何将凭证对象传递给 newHadoopAPi
GoogleAuthorizationCodeFlow flow = getFlow();
GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
.setRedirectUri(REDIRECT_URI).execute();
Credential credential=flow.createAndStoreCredential(response, null);
return credential;
我的 Hadoop api 代码是我想使用我的凭证对象的地方
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).
我认为适用于 Hadoop 的 BigQuery 连接器可能会解决您的问题,而无需您编写自己的低级客户端。检查一下:https://cloud.google.com/hadoop/bigquery-connector
下面是一个使用它将 Spark 连接到 BigQuery 的示例:https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example
感谢@michael 在您的帮助下 link 我找到了解决方案
只需在 hadoop 配置上禁用服务帐户
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
下面的代码将起作用
val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
hadoopConfiguration,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject])
其中令牌路径包含刷新令牌
{
"credentials": {
"user": {
"access_token": "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
"expiration_time_millis": 1460473581255,
"refresh_token": "XXXXXXXXXxxxxxxxxx"
}
}
}
我是 gcloud 和 BigQuery 的新手,想使用 spark 从 BigQuery 读取数据。 我使用 Google APIs Client Library for Java. 并且能够连接到 BigQuery。 我得到 com.google.api.services.bigquery.Bigquery 对象并能够打印读取的数据集、tableId 和 tableData
我的问题是
我如何将这个 BigQuery 身份验证对象(凭据对象)连接到 spark 或者是否可以将这个对象与 hadoopApi 一起使用
如果没有可能,如何将凭证对象传递给 newHadoopAPi
GoogleAuthorizationCodeFlow flow = getFlow();
GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
.setRedirectUri(REDIRECT_URI).execute();
Credential credential=flow.createAndStoreCredential(response, null);
return credential;
我的 Hadoop api 代码是我想使用我的凭证对象的地方
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).
我认为适用于 Hadoop 的 BigQuery 连接器可能会解决您的问题,而无需您编写自己的低级客户端。检查一下:https://cloud.google.com/hadoop/bigquery-connector
下面是一个使用它将 Spark 连接到 BigQuery 的示例:https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example
感谢@michael 在您的帮助下 link 我找到了解决方案
只需在 hadoop 配置上禁用服务帐户
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
下面的代码将起作用
val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
hadoopConfiguration,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject])
其中令牌路径包含刷新令牌
{
"credentials": {
"user": {
"access_token": "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
"expiration_time_millis": 1460473581255,
"refresh_token": "XXXXXXXXXxxxxxxxxx"
}
}
}