Spark 2 数据集空值异常
Spark 2 Dataset Null value exception
在 spark 中获取此空错误 Dataset.filter
输入 CSV:
name,age,stat
abc,22,m
xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
失败代码(添加以下行return错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
Returns 空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
你得到的异常应该解释一切,但让我们开始 step-by-step:
当使用 csv
数据源加载数据时,所有字段都标记为 nullable
:
val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
缺失字段表示为 SQL NULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
接下来将 Dataset[Row]
转换为 Dataset[Person]
,后者使用 Long
对 age
字段进行编码。 Long
在 Scala 中不能是 null
。因为输入模式是 nullable
,尽管如此,输出模式仍保持 nullable
:
val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它 as[T]
根本不会影响架构。
当您使用 SQL(已注册 table)或 DataFrame
API 查询 Dataset
时,Spark 不会反序列化目的。由于模式仍然是 nullable
我们可以执行:
peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题。这只是一个简单的 SQL 逻辑,NULL
是一个有效值。
当我们使用静态类型时Dataset
API:
peopleDS.filter(_.age > 30)
Spark 必须反序列化对象。因为 Long
不能是 null
(SQL NULL
) 它会失败,出现异常。
如果不是因为你会得到 NPE。
数据的正确静态类型表示应使用 Optional
类型:
case class Person(name: String, age: Option[Long], stat: String)
具有调整后的过滤功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果您愿意,可以使用模式匹配:
peopleDS.filter {
case Some(age) => age > 30
case _ => false // or case None => false
}
请注意,您不必(但无论如何都会推荐)为 name
和 stat
使用可选类型。因为 Scala String
只是一个 Java String
它可以是 null
。当然,如果您采用这种方法,则必须明确检查访问的值是否为 null
。
相关
在 spark 中获取此空错误 Dataset.filter
输入 CSV:
name,age,stat
abc,22,m
xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
失败代码(添加以下行return错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
Returns 空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
你得到的异常应该解释一切,但让我们开始 step-by-step:
当使用
csv
数据源加载数据时,所有字段都标记为nullable
:val path: String = ??? val peopleDF = spark.read .option("inferSchema","true") .option("header", "true") .option("delimiter", ",") .csv(path) peopleDF.printSchema
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- stat: string (nullable = true)
缺失字段表示为 SQL
NULL
peopleDF.where($"age".isNull).show
+----+----+----+ |name| age|stat| +----+----+----+ | xyz|null| s| +----+----+----+
接下来将
Dataset[Row]
转换为Dataset[Person]
,后者使用Long
对age
字段进行编码。Long
在 Scala 中不能是null
。因为输入模式是nullable
,尽管如此,输出模式仍保持nullable
:val peopleDS = peopleDF.as[Person] peopleDS.printSchema
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- stat: string (nullable = true)
请注意,它
as[T]
根本不会影响架构。当您使用 SQL(已注册 table)或
DataFrame
API 查询Dataset
时,Spark 不会反序列化目的。由于模式仍然是nullable
我们可以执行:peopleDS.where($"age" > 30).show
+----+---+----+ |name|age|stat| +----+---+----+ +----+---+----+
没有任何问题。这只是一个简单的 SQL 逻辑,
NULL
是一个有效值。当我们使用静态类型时
Dataset
API:peopleDS.filter(_.age > 30)
Spark 必须反序列化对象。因为
Long
不能是null
(SQLNULL
) 它会失败,出现异常。如果不是因为你会得到 NPE。
数据的正确静态类型表示应使用
Optional
类型:case class Person(name: String, age: Option[Long], stat: String)
具有调整后的过滤功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+ |name|age|stat| +----+---+----+ +----+---+----+
如果您愿意,可以使用模式匹配:
peopleDS.filter { case Some(age) => age > 30 case _ => false // or case None => false }
请注意,您不必(但无论如何都会推荐)为
name
和stat
使用可选类型。因为 ScalaString
只是一个 JavaString
它可以是null
。当然,如果您采用这种方法,则必须明确检查访问的值是否为null
。
相关