Dataproc + BigQuery 示例 - 是否可用?
Dataproc + BigQuery examples - any available?
根据 Dataproc docos,它具有“与 BigQuery 的本机和自动集成”。
我在 BigQuery 中有一个 table。我想阅读 table 并使用我创建的 Dataproc 集群(使用 PySpark 作业)对其进行一些分析。然后将此分析的结果写回 BigQuery。您可能会问 "why not just do the analysis in BigQuery directly!?" - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要像 Python 或 R 这样的东西,ergo Dataproc。
是否有可用的 Dataproc + BigQuery 示例?我找不到。
开始,如 the BigQuery connector is preinstalled on Cloud Dataproc 簇中所述。
这是一个关于如何将数据从 BigQuery 读取到 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。
您使用 SparkContext.newAPIHadoopRDD
从 Spark 中的 BigQuery 读取数据。 Spark documentation 有更多关于使用 SparkContext.newAPIHadoopRDD
的信息。 '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
"[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"
val conf = sc.hadoopConfiguration
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)
// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
fullyQualifiedOutputTableId, outputTableSchema)
val fieldName = "word"
val tableData = sc.newAPIHadoopRDD(conf,
classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)
您需要使用您的设置自定义此示例,包括 <your-project-id>
中的 Cloud Platform 项目 ID 和 <your-fully-qualified-table-id>
中的输出 table ID。
最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,this page 提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。
以上示例未显示如何将数据写入输出 table。您需要这样做:
.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY),
classOf[String],
classOf[JsonObject],
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)
其中 key: String 实际上被忽略了
根据 Dataproc docos,它具有“与 BigQuery 的本机和自动集成”。
我在 BigQuery 中有一个 table。我想阅读 table 并使用我创建的 Dataproc 集群(使用 PySpark 作业)对其进行一些分析。然后将此分析的结果写回 BigQuery。您可能会问 "why not just do the analysis in BigQuery directly!?" - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要像 Python 或 R 这样的东西,ergo Dataproc。
是否有可用的 Dataproc + BigQuery 示例?我找不到。
开始,如
这是一个关于如何将数据从 BigQuery 读取到 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。
您使用 SparkContext.newAPIHadoopRDD
从 Spark 中的 BigQuery 读取数据。 Spark documentation 有更多关于使用 SparkContext.newAPIHadoopRDD
的信息。 '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
"[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"
val conf = sc.hadoopConfiguration
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)
// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
fullyQualifiedOutputTableId, outputTableSchema)
val fieldName = "word"
val tableData = sc.newAPIHadoopRDD(conf,
classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)
您需要使用您的设置自定义此示例,包括 <your-project-id>
中的 Cloud Platform 项目 ID 和 <your-fully-qualified-table-id>
中的输出 table ID。
最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,this page 提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。
以上示例未显示如何将数据写入输出 table。您需要这样做:
.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY),
classOf[String],
classOf[JsonObject],
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)
其中 key: String 实际上被忽略了