Apache Spark 2.1:java.lang.UnsupportedOperationException:未找到 scala.collection.immutable.Set[String] 的编码器
Apache Spark 2.1 : java.lang.UnsupportedOperationException: No Encoder found for scala.collection.immutable.Set[String]
我正在使用 Spark 2.1.1 和 Scala 2.11.6。我收到以下错误。我没有使用任何大小写 类。
java.lang.UnsupportedOperationException: No Encoder found for scala.collection.immutable.Set[String]
field (class: "scala.collection.immutable.Set", name: "_2")
field (class: "scala.Tuple2", name: "_2")
root class: "scala.Tuple2"
以下代码部分是堆栈跟踪指向的位置。
val tweetArrayRDD = nameDF.select("namedEnts", "text", "storylines")
.flatMap {
case Row(namedEnts: Traversable[(String, String)], text: String, storylines: Traversable[String]) =>
Option(namedEnts) match {
case Some(x: Traversable[(String, String)]) =>
//println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
namedEnts.map((_, (text, storylines.toSet)))
case _ => //println("In flatMap: blahhhh")
Traversable()
}
case _ => //println("In flatMap: fooooo")
Traversable()
}
.rdd.aggregateByKey((Set[String](), Set[String]()))((a, b) => (a._1 + b._1, a._2 ++ b._2), (a, b) => (a._1 ++ b._1, a._2 ++ b._2))
.map { (s: ((String, String), (Set[String], Set[String]))) => {
//println("In map: " + s)
(s._1, (s._2._1.toSeq, s._2._2.toSeq))
}}
这里的问题是 Spark 没有为 Set
提供开箱即用的编码器(它确实为 "primitives"、序列、数组和其他支持的产品提供了编码器类型)。
您可以尝试使用 为 Set[String]
创建您自己的编码器(更准确地说,是您正在使用的 Traversable[((String, String), (String, Set[String]))]
类型的编码器,其中包含 Set[String]
), 或 您可以使用 Seq
而不是 Set
:
来解决此问题
// ...
case Some(x: Traversable[(String, String)]) =>
//println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
namedEnts.map((_, (text, storylines.toSeq.distinct)))
// ...
(我正在使用 distinct
模仿 Set
行为;也可以尝试 .toSet.toSeq
)
更新:根据您对 Spark 1.6.2 的评论 - 不同之处在于在 1.6.2 中,Dataset.flatMap
returns 和 RDD
而不是 Dataset
,因此不需要对您提供的函数返回的结果进行编码;因此,这确实带来了另一个很好的解决方法——您可以通过显式切换到使用 RDD before flatMap
操作来轻松模拟此行为:
nameDF.select("namedEnts", "text", "storylines")
.rdd
.flatMap { /*...*/ } // use your function as-is, it can return Set[String]
.aggregateByKey( /*...*/ )
.map( /*...*/ )
我正在使用 Spark 2.1.1 和 Scala 2.11.6。我收到以下错误。我没有使用任何大小写 类。
java.lang.UnsupportedOperationException: No Encoder found for scala.collection.immutable.Set[String]
field (class: "scala.collection.immutable.Set", name: "_2")
field (class: "scala.Tuple2", name: "_2")
root class: "scala.Tuple2"
以下代码部分是堆栈跟踪指向的位置。
val tweetArrayRDD = nameDF.select("namedEnts", "text", "storylines")
.flatMap {
case Row(namedEnts: Traversable[(String, String)], text: String, storylines: Traversable[String]) =>
Option(namedEnts) match {
case Some(x: Traversable[(String, String)]) =>
//println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
namedEnts.map((_, (text, storylines.toSet)))
case _ => //println("In flatMap: blahhhh")
Traversable()
}
case _ => //println("In flatMap: fooooo")
Traversable()
}
.rdd.aggregateByKey((Set[String](), Set[String]()))((a, b) => (a._1 + b._1, a._2 ++ b._2), (a, b) => (a._1 ++ b._1, a._2 ++ b._2))
.map { (s: ((String, String), (Set[String], Set[String]))) => {
//println("In map: " + s)
(s._1, (s._2._1.toSeq, s._2._2.toSeq))
}}
这里的问题是 Spark 没有为 Set
提供开箱即用的编码器(它确实为 "primitives"、序列、数组和其他支持的产品提供了编码器类型)。
您可以尝试使用 Set[String]
创建您自己的编码器(更准确地说,是您正在使用的 Traversable[((String, String), (String, Set[String]))]
类型的编码器,其中包含 Set[String]
), 或 您可以使用 Seq
而不是 Set
:
// ...
case Some(x: Traversable[(String, String)]) =>
//println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
namedEnts.map((_, (text, storylines.toSeq.distinct)))
// ...
(我正在使用 distinct
模仿 Set
行为;也可以尝试 .toSet.toSeq
)
更新:根据您对 Spark 1.6.2 的评论 - 不同之处在于在 1.6.2 中,Dataset.flatMap
returns 和 RDD
而不是 Dataset
,因此不需要对您提供的函数返回的结果进行编码;因此,这确实带来了另一个很好的解决方法——您可以通过显式切换到使用 RDD before flatMap
操作来轻松模拟此行为:
nameDF.select("namedEnts", "text", "storylines")
.rdd
.flatMap { /*...*/ } // use your function as-is, it can return Set[String]
.aggregateByKey( /*...*/ )
.map( /*...*/ )