如何将 spark sql 输出收集到文件中?
how to collect spark sql output to a file?
下面是我的 spark sql 脚本,它加载一个文件并在其上使用 SQL,我想收集 sql 查询的输出并将其写入文件,不知道如何帮助。
//import classes for sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
//hdfs paths
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/"
val customers_path=warehouse+"people/people.txt"
customers_path
//create rdd file called file
val file=sc.textFile(customers_path)
val schemaString="name age"
import org.apache.spark.sql._
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim))
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
sqlContext.sql("select count(*) from people").collect().foreach(println)
System.exit(0)
//import classes for sql
import sqlContext.implicits._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
import sqlContext.implicits._
//hdfs paths
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/"
val customers_path=warehouse+"people/people.txt"
customers_path
//create rdd file called file
val file=sc.textFile(customers_path)
val schemaString="name age"
import org.apache.spark.sql._
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim))
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val op=sqlContext.sql("select count(*) from people")
val c=op.collect()
val rdd=sc.parallelize(c)
rdd.saveAsTextFile("/home/cloudera/op")
System.exit(0)
如果你只是想统计HDFS上一个大文件的行数,然后写入另一个文件:
import java.nio.file.{ Files, Paths }
val path = "hdfs://quickstart.cloudera/user/hive/warehouse/people/people.txt"
val rdd = sc.textFile(path)
val linesCount = rdd.count
Files.write(Paths.get("line_count.txt"), linesCount.toString.getBytes)
peopleSchemaRDD.registerTempTable("people")
val op=sqlContext.sql("select * from people").count().toString
val pw=new PrintWriter(new File("path"))
pw.write("count of people:"+op+"\n")
pw.close()
创建一个临时的 table 命名的人,然后编写查询以获得所需的输出和计数函数,该函数计算输出转换为字符串后的行数。
使用打印编写器调用对象 op 中的这个存储值,将其写入文本文件。
如果 people 列包含重复值,则使用 distinct 关键字来区分 sql 查询中的唯一值。
下面是我的 spark sql 脚本,它加载一个文件并在其上使用 SQL,我想收集 sql 查询的输出并将其写入文件,不知道如何帮助。
//import classes for sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
//hdfs paths
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/"
val customers_path=warehouse+"people/people.txt"
customers_path
//create rdd file called file
val file=sc.textFile(customers_path)
val schemaString="name age"
import org.apache.spark.sql._
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim))
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
sqlContext.sql("select count(*) from people").collect().foreach(println)
System.exit(0)
//import classes for sql
import sqlContext.implicits._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
import sqlContext.implicits._
//hdfs paths
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/"
val customers_path=warehouse+"people/people.txt"
customers_path
//create rdd file called file
val file=sc.textFile(customers_path)
val schemaString="name age"
import org.apache.spark.sql._
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim))
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val op=sqlContext.sql("select count(*) from people")
val c=op.collect()
val rdd=sc.parallelize(c)
rdd.saveAsTextFile("/home/cloudera/op")
System.exit(0)
如果你只是想统计HDFS上一个大文件的行数,然后写入另一个文件:
import java.nio.file.{ Files, Paths }
val path = "hdfs://quickstart.cloudera/user/hive/warehouse/people/people.txt"
val rdd = sc.textFile(path)
val linesCount = rdd.count
Files.write(Paths.get("line_count.txt"), linesCount.toString.getBytes)
peopleSchemaRDD.registerTempTable("people")
val op=sqlContext.sql("select * from people").count().toString
val pw=new PrintWriter(new File("path"))
pw.write("count of people:"+op+"\n")
pw.close()
创建一个临时的 table 命名的人,然后编写查询以获得所需的输出和计数函数,该函数计算输出转换为字符串后的行数。 使用打印编写器调用对象 op 中的这个存储值,将其写入文本文件。 如果 people 列包含重复值,则使用 distinct 关键字来区分 sql 查询中的唯一值。