使用 Spark (1.6) 从 Scala 中 Dataframe 的数组列中删除空值
Remove Null from Array Columns in Dataframe in Scala with Spark (1.6)
我有一个数据框,其中包含一个键列和一个包含结构数组的列。架构如下所示。
root
|-- id: string (nullable = true)
|-- desc: array (nullable = false)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: long (nullable = false)
数组"desc" 可以有任意数量的空值。我想使用 spark 1.6 创建一个具有空值 none 的数组的最终数据框:
例如:
Key . Value
1010 . [[George,21],null,[MARIE,13],null]
1023 . [null,[Watson,11],[John,35],null,[Kyle,33]]
我想要最终数据框为:
Key . Value
1010 . [[George,21],[MARIE,13]]
1023 . [[Watson,11],[John,35],[Kyle,33]]
我尝试用 UDF 和 case class 这样做,但得到
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to....
非常感谢任何帮助,如果需要,我更愿意在不转换为 RDD 的情况下这样做。我也是 spark 和 scala 的新手,所以在此先感谢!!!
鉴于原始数据框具有以下架构
root
|-- id: string (nullable = true)
|-- desc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: long (nullable = false)
定义一个 udf
函数 以从数组中删除空值应该 对你有效
import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))
df.withColumn("desc", removeNull(col("desc")))
其中 element
是 case class
case class element(name: String, age: Long)
你应该得到
+----+-----------------------------------+
|id |desc |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]] |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+
这是另一个版本:
case class Person(name: String, age: Int)
root
|-- id: string (nullable = true)
|-- desc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = false)
+----+-----------------------------------------------+
|id |desc |
+----+-----------------------------------------------+
|1010|[[George,21], null, [MARIE,13], null] |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|
+----+-----------------------------------------------+
val filterOutNull = udf((xs: Seq[Row]) => {
xs.flatMap {
case null => Nil
// convert the Row back to your specific struct:
case Row(s: String,i: Int) => List(Person(s, i))
}
})
val result = df.withColumn("filteredListDesc", filterOutNull($"desc"))
+----+-----------------------------------------------+-----------------------------------+
|id |desc |filteredListDesc |
+----+-----------------------------------------------+-----------------------------------+
|1010|[[George,21], null, [MARIE,13], null] |[[George,21], [MARIE,13]] |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------------------+-----------------------------------+
我有一个数据框,其中包含一个键列和一个包含结构数组的列。架构如下所示。
root
|-- id: string (nullable = true)
|-- desc: array (nullable = false)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: long (nullable = false)
数组"desc" 可以有任意数量的空值。我想使用 spark 1.6 创建一个具有空值 none 的数组的最终数据框:
例如:
Key . Value
1010 . [[George,21],null,[MARIE,13],null]
1023 . [null,[Watson,11],[John,35],null,[Kyle,33]]
我想要最终数据框为:
Key . Value
1010 . [[George,21],[MARIE,13]]
1023 . [[Watson,11],[John,35],[Kyle,33]]
我尝试用 UDF 和 case class 这样做,但得到
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to....
非常感谢任何帮助,如果需要,我更愿意在不转换为 RDD 的情况下这样做。我也是 spark 和 scala 的新手,所以在此先感谢!!!
鉴于原始数据框具有以下架构
root
|-- id: string (nullable = true)
|-- desc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: long (nullable = false)
定义一个 udf
函数 以从数组中删除空值应该 对你有效
import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))
df.withColumn("desc", removeNull(col("desc")))
其中 element
是 case class
case class element(name: String, age: Long)
你应该得到
+----+-----------------------------------+
|id |desc |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]] |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+
这是另一个版本:
case class Person(name: String, age: Int)
root
|-- id: string (nullable = true)
|-- desc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = false)
+----+-----------------------------------------------+
|id |desc |
+----+-----------------------------------------------+
|1010|[[George,21], null, [MARIE,13], null] |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|
+----+-----------------------------------------------+
val filterOutNull = udf((xs: Seq[Row]) => {
xs.flatMap {
case null => Nil
// convert the Row back to your specific struct:
case Row(s: String,i: Int) => List(Person(s, i))
}
})
val result = df.withColumn("filteredListDesc", filterOutNull($"desc"))
+----+-----------------------------------------------+-----------------------------------+
|id |desc |filteredListDesc |
+----+-----------------------------------------------+-----------------------------------+
|1010|[[George,21], null, [MARIE,13], null] |[[George,21], [MARIE,13]] |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------------------+-----------------------------------+