Spark 2 期权数据集
Spark 2 Datasets of Options
我有一个字符串数据集,我使用一个可能会失败的函数(例如,如果我尝试解析的数据不可用)将其解析为一个案例的数据集 class。出于这个原因,函数 returns 一个 Option (Scala)。
所以我最终得到了 Option[MyCaseClass].
的数据集
Spark 似乎接受该数据集并对其进行处理,但如果解析失败则不返回 None
而是 returns 我 Some(MyCaseClass(null, null...))
。
这是一个代码示例:
recordsDs
.map { record =>
val maybeArticle = unmarshallArticle(record)
if (maybeArticle.isEmpty) {
println(s"Could not parse record $record into an article.")
}
maybeArticle
}
.filter(_.isDefined)
.map(_.get)
.collect().toList // Always returns a List(Some(Article(null, null), Some(Article...
我的猜测是,在序列化然后反序列化 Option 值时,Spark 使用 Some() 构造函数而不是检查 Option 是 Some 还是 None。
我显然可以围绕我的对象创建一个包装器,例如 MaybeArticle(article: Option[Article])
,但我想知道 Spark 是否可以正确处理 Options 的数据集?
我认为解决方案是使用 flatMap
。这是一个非常愚蠢的例子:
scala> val ds = Seq(("a1"), ("a2"), ("a4"), ("b1"), ("b2")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
| b1|
| b2|
+-----+
scala> val ds2 = ds.flatMap{x => if (x.contains("a")) Some(x) else None}
ds2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds2.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
+-----+
之所以可行,是因为 Some
和 None
就像可以使用 flatMap
解压的集合(其中 None
元素被省略)。
我有一个字符串数据集,我使用一个可能会失败的函数(例如,如果我尝试解析的数据不可用)将其解析为一个案例的数据集 class。出于这个原因,函数 returns 一个 Option (Scala)。 所以我最终得到了 Option[MyCaseClass].
的数据集Spark 似乎接受该数据集并对其进行处理,但如果解析失败则不返回 None
而是 returns 我 Some(MyCaseClass(null, null...))
。
这是一个代码示例:
recordsDs
.map { record =>
val maybeArticle = unmarshallArticle(record)
if (maybeArticle.isEmpty) {
println(s"Could not parse record $record into an article.")
}
maybeArticle
}
.filter(_.isDefined)
.map(_.get)
.collect().toList // Always returns a List(Some(Article(null, null), Some(Article...
我的猜测是,在序列化然后反序列化 Option 值时,Spark 使用 Some() 构造函数而不是检查 Option 是 Some 还是 None。
我显然可以围绕我的对象创建一个包装器,例如 MaybeArticle(article: Option[Article])
,但我想知道 Spark 是否可以正确处理 Options 的数据集?
我认为解决方案是使用 flatMap
。这是一个非常愚蠢的例子:
scala> val ds = Seq(("a1"), ("a2"), ("a4"), ("b1"), ("b2")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
| b1|
| b2|
+-----+
scala> val ds2 = ds.flatMap{x => if (x.contains("a")) Some(x) else None}
ds2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds2.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
+-----+
之所以可行,是因为 Some
和 None
就像可以使用 flatMap
解压的集合(其中 None
元素被省略)。