如何使用 BigQuery api 将 spark 连接到 BigQuery

How to connect spark to BigQuery using BigQuery api

我是 gcloud 和 BigQuery 的新手,想使用 spark 从 BigQuery 读取数据。 我使用 Google APIs Client Library for Java. 并且能够连接到 BigQuery。 我得到 com.google.api.services.bigquery.Bigquery 对象并能够打印读取的数据集、tableId 和 tableData

我的问题是

我如何将这个 BigQuery 身份验证对象(凭据对象)连接到 spark 或者是否可以将这个对象与 hadoopApi 一起使用

如果没有可能,如何将凭证对象传递给 newHadoopAPi

GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    Credential credential=flow.createAndStoreCredential(response, null);
    return credential; 

我的 Hadoop api 代码是我想使用我的凭证对象的地方

val tableData = sc.newAPIHadoopRDD(
  conf,
  classOf[GsonBigQueryInputFormat],
  classOf[LongWritable],
  classOf[JsonObject]).

我认为适用于 Hadoop 的 BigQuery 连接器可能会解决您的问题,而无需您编写自己的低级客户端。检查一下:https://cloud.google.com/hadoop/bigquery-connector

下面是一个使用它将 Spark 连接到 BigQuery 的示例:https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example

感谢@michael 在您的帮助下 link 我找到了解决方案

只需在 hadoop 配置上禁用服务帐户

hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")

下面的代码将起作用

val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
  clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
  clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)

// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
  hadoopConfiguration,
  classOf[GsonBigQueryInputFormat],
  classOf[LongWritable],
  classOf[JsonObject])

其中令牌路径包含刷新令牌

{
    "credentials": {
        "user": {
            "access_token":     "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
            "expiration_time_millis": 1460473581255,
            "refresh_token": "XXXXXXXXXxxxxxxxxx"
            }
       }
}