将无效数据设置为 Spark DataFrames 中的缺失数据

Setting invalid data to missing data in Spark DataFrames

令 x 为定义为(在 Scala 中)

的两列字符串的数据帧
case class Pair(X: String, Y: String)

val x = sqlContext.createDataFrame(Seq(
   Pair("u1", "1"), 
   Pair("u2", "wrong value"), 
   Pair("u3", "5"), 
   Pair("u4", "2")
))

我想清理这个数据框,使第二列的每个值都是

  1. 尽可能转换为 Int
  2. 替换为 null、Na 或任何符号意思 "missing value"(不是 NaN,这是不同的)

我正在考虑使用 udf 函数

val stringToInt = udf[Int, String](x => try {
     x.toInt
   } catch {
     case e: Exception => null
   })

x.withColumn("Y", stringToInt(x("Y")))

... 但是 null 不是字符串,编译器拒绝它。请问有什么办法解决吗?只要我可以清理我的数据框,完全不同的方法也可以

而不是使用 null,而是使用 Option[Int]:

val pairs = Seq(
   Pair("u1", "1"), 
   Pair("u2", "wrong value"), 
   Pair("u3", "5"), 
   Pair("u4", "2")
)

def toInt(s: String): Option[Int] = try { Some(s.toInt) } catch { case NumberFormatException => None }

val stringToInt = udf[Int, Option[Int]](toInt _)

那你可以做

val x = sqlContext.createDataFrame(pairs)
x.withColumn("Y", stringToInt(x("Y")))

实际上在这种特殊情况下不需要 UDF。相反,您可以安全地使用 Column.cast 方法:

import org.apache.spark.sql.types.IntegerType
val clean = x.withColumn("Y", $"Y".cast(IntegerType)) // or .cast("integer")

clean.where($"Y".isNotNull).show
// +---+---+
// |  X|  Y|
// +---+---+
// | u1|  1|
// | u3|  5|
// | u4|  2|
// +---+---+

clean.where($"Y".isNull).show
// +---+----+
// |  X|   Y|
// +---+----+
// | u2|null|
// +---+----+