Dataproc + BigQuery 示例 - 是否可用?

Dataproc + BigQuery examples - any available?

根据 Dataproc docos,它具有“与 BigQuery 的本机和自动集成”。

我在 BigQuery 中有一个 table。我想阅读 table 并使用我创建的 Dataproc 集群(使用 PySpark 作业)对其进行一些分析。然后将此分析的结果写回 BigQuery。您可能会问 "why not just do the analysis in BigQuery directly!?" - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要像 Python 或 R 这样的东西,ergo Dataproc。

是否有可用的 Dataproc + BigQuery 示例?我找不到。

开始,如 the BigQuery connector is preinstalled on Cloud Dataproc 簇中所述。

这是一个关于如何将数据从 BigQuery 读取到 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。 您使用 SparkContext.newAPIHadoopRDD 从 Spark 中的 BigQuery 读取数据。 Spark documentation 有更多关于使用 SparkContext.newAPIHadoopRDD 的信息。 '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject

import org.apache.hadoop.io.LongWritable

val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
    "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"

val conf = sc.hadoopConfiguration

// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)

// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)

// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
    fullyQualifiedOutputTableId, outputTableSchema)

val fieldName = "word"

val tableData = sc.newAPIHadoopRDD(conf,
    classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)

您需要使用您的设置自定义此示例,包括 <your-project-id> 中的 Cloud Platform 项目 ID 和 <your-fully-qualified-table-id> 中的输出 table ID。

最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,this page 提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。


以上示例未显示如何将数据写入输出 table。您需要这样做:

.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY), 
classOf[String], 
classOf[JsonObject], 
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)

其中 key: String 实际上被忽略了