foreachRDD 中的案例 Class 导致序列化错误
Case Class within foreachRDD causes Serialization Error
我可以在 foreachRDD 中创建一个 DF 如果我不尝试使用 Case Class 并且只是让列的默认名称使用 toDF() 或者如果我通过 toDF("c1, "c2").
当我尝试使用案例 Class 并查看示例后,我得到:
Task not serializable
如果我移动 Case Class 语句,我会得到:
toDF() not part of RDD[CaseClass]
这是遗留问题,但我很好奇 Spark 可能产生的第 n 个序列化错误,以及它是否会延续到结构化流中。
我有一个不需要拆分的 RDD,这可能是问题所在吗?不。 运行 在 DataBricks 中?
编码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
case class Person(name: String, age: Int) //extends Serializable // Some say inherently serializable so not required
val spark = SparkSession.builder
.master("local[4]")
.config("spark.driver.cores", 2)
.appName("forEachRDD")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue)
QS.foreachRDD(q => {
if(!q.isEmpty) {
import spark.implicits._
val q_flatMap = q.flatMap{x=>x}
val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
val df = q_withPerson.toDF()
df.show(false)
}
}
)
ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()
我不是和 Java 一起长大的,但环顾四周,我知道该怎么做,但我不够专业,无法解释。
我 运行 在 DataBricks 笔记本上制作原型。
线索是
case class Person(name: String, age: Int)
在同一个数据库笔记本中。需要在当前笔记本外部定义案例 class - 在单独的笔记本中 - 从而与 Streaming class 运行 分开。
我可以在 foreachRDD 中创建一个 DF 如果我不尝试使用 Case Class 并且只是让列的默认名称使用 toDF() 或者如果我通过 toDF("c1, "c2").
当我尝试使用案例 Class 并查看示例后,我得到:
Task not serializable
如果我移动 Case Class 语句,我会得到:
toDF() not part of RDD[CaseClass]
这是遗留问题,但我很好奇 Spark 可能产生的第 n 个序列化错误,以及它是否会延续到结构化流中。
我有一个不需要拆分的 RDD,这可能是问题所在吗?不。 运行 在 DataBricks 中?
编码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
case class Person(name: String, age: Int) //extends Serializable // Some say inherently serializable so not required
val spark = SparkSession.builder
.master("local[4]")
.config("spark.driver.cores", 2)
.appName("forEachRDD")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue)
QS.foreachRDD(q => {
if(!q.isEmpty) {
import spark.implicits._
val q_flatMap = q.flatMap{x=>x}
val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
val df = q_withPerson.toDF()
df.show(false)
}
}
)
ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()
我不是和 Java 一起长大的,但环顾四周,我知道该怎么做,但我不够专业,无法解释。
我 运行 在 DataBricks 笔记本上制作原型。
线索是
case class Person(name: String, age: Int)
在同一个数据库笔记本中。需要在当前笔记本外部定义案例 class - 在单独的笔记本中 - 从而与 Streaming class 运行 分开。