使用 pyspark 脚本将 table 从 bigquery 加载到 spark 集群
load table from bigquery to spark cluster with pyspark script
我在 bigquery 中加载了一个数据 table,我想通过 pyspark .py 文件将其导入到我的 spark 集群中。
我在 中看到有一种方法可以使用 scala 在 spark 集群中加载一个 bigquery table,但是有没有一种方法可以在 pyspark 脚本中完成它?
这来自@MattJ 在this question。下面是在 Spark 中连接到 BigQuery 并执行字数统计的示例。
import json
import pyspark
sc = pyspark.SparkContext()
hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")
conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
"mapred.bq.input.project.id": "publicdata",
"mapred.bq.input.dataset.id":"samples",
"mapred.bq.input.table.id": "shakespeare" }
tableData = sc.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject",
conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
您需要更改 <project_id>
和 <bucket>
以匹配您项目的设置。
我在 bigquery 中加载了一个数据 table,我想通过 pyspark .py 文件将其导入到我的 spark 集群中。
我在
这来自@MattJ 在this question。下面是在 Spark 中连接到 BigQuery 并执行字数统计的示例。
import json
import pyspark
sc = pyspark.SparkContext()
hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")
conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
"mapred.bq.input.project.id": "publicdata",
"mapred.bq.input.dataset.id":"samples",
"mapred.bq.input.table.id": "shakespeare" }
tableData = sc.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject",
conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
您需要更改 <project_id>
和 <bucket>
以匹配您项目的设置。