使用 SPARK (SQL) 删除不必要的 JSON 字段

Removing Unnecessary JSON fields using SPARK (SQL)

我是一个新的 spark 用户,目前正在使用 Spark 和一些大数据,我有一个与 Spark SQL 或更正式的 SchemaRDD 相关的问题。我正在阅读一个 JSON 文件,其中包含有关某些天气预报的数据,但我对我拥有的所有字段并不真正感兴趣……我只想为每条记录返回 50 多个字段中的 10 个字段。有没有一种方法(类似于过滤器)可以用来指定我想从 spark 中删除的某些字段的名称。

只是一个描述性的小例子。假设我有包含 3 个字段 "Name"、"Age" 和 "Gender" 的架构 "Person",我对 "Age" 字段不感兴趣,并且想删除它。我可以使用 spark 一些方法来做到这一点吗? ?谢谢

您可以指定您希望在 schemaRDD 中包含哪些字段。下面是一个例子。创建一个案例 class,仅包含您需要的字段。将数据读入 rdd,然后仅指定您需要的字段(与在案例 class 中指定模式的顺序相同)。

Sample Data: People.txt
foo,25,M
bar,24,F

代码:

case class Person(name: String, gender: String)
val people = sc.textFile("People.txt").map(_.split(",")).map(p => Person(p(0), p(2)))
people.registerTempTable("people")

如果您使用的是 Spark 1.2,您可以执行以下操作(使用 Scala)...

如果您已经知道要使用哪些字段,则可以为这些字段构建架构并将此架构应用于 JSON 数据集。 Spark SQL 将 return 一个 SchemaRDD。然后,您可以将其注册并查询为 table。这是一个片段...

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
// The schema is encoded in a string
val schemaString = "name gender"
// Import Spark SQL data types.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Create the SchemaRDD for your JSON file "people" (every line of this file is a JSON object).
val peopleSchemaRDD = sqlContext.jsonFile("people.txt", schema)
// Check the schema of peopleSchemaRDD
peopleSchemaRDD.printSchema()
// Register peopleSchemaRDD as a table called "people"
peopleSchemaRDD.registerTempTable("people")
// Only values of name and gender fields will be in the results.
val results = sqlContext.sql("SELECT * FROM people")

当您查看 peopleSchemaRDD (peopleSchemaRDD.printSchema()) 的模式时,您只会看到姓名和性别字段。

或者,如果您想探索数据集并在看到所有字段后确定您想要的字段,您可以让 Spark SQL 为您推断架构。然后,您可以将 SchemaRDD 注册为 table 并使用投影来删除不需要的字段。这是一个片段...

// Spark SQL will infer the schema of the given JSON file.
val peopleSchemaRDD = sqlContext.jsonFile("people.txt")
// Check the schema of peopleSchemaRDD
peopleSchemaRDD.printSchema()
// Register peopleSchemaRDD as a table called "people"
peopleSchemaRDD.registerTempTable("people")
// Project name and gender field.
sqlContext.sql("SELECT name, gender FROM people")