无法在 foreachRDD 中序列化 SparkContext
Unable to serialize SparkContext in foreachRDD
我正在尝试将流数据从 Kafka 保存到 cassandra。我能够读取和解析数据,但是当我调用下面的行来保存数据时,我得到了 Task not Serializable
异常。我的 class 正在扩展可序列化但不确定为什么我会看到此错误,谷歌搜索 3 小时后没有得到太多帮助,有人可以提供任何指示吗?
val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))`
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
try {
rdd.foreachPartition { iter =>
iter.foreach {
case (key, msg) =>
val obj = msgParseMaster(msg)
val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))
}
}
}
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
我得到
org.apache.spark.SparkException: Task not serializable
下面是完整的日志
16/08/06 10:24:52 ERROR JobScheduler: Error running job streaming job 1470504292000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at
您不能在传递给 foreachPartition
的函数中调用 sc.parallelize
- 该函数必须序列化并发送给每个执行程序,并且 SparkContext
(有意)不可序列化(它应该只驻留在驱动程序应用程序中,而不是执行程序中)。
SparkContext
不可序列化,您不能在 foreachRDD
中使用它,并且从使用您的图表中您不需要它。相反,您可以简单地映射每个 RDD,解析出相关数据并将新的 RDD 保存到 cassandra:
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}
.foreachRDD(rdd => if (!rdd.isEmpty)
rdd.saveToCassandra("testKS",
"testTable",
SomeColumns("id", "data")))
我正在尝试将流数据从 Kafka 保存到 cassandra。我能够读取和解析数据,但是当我调用下面的行来保存数据时,我得到了 Task not Serializable
异常。我的 class 正在扩展可序列化但不确定为什么我会看到此错误,谷歌搜索 3 小时后没有得到太多帮助,有人可以提供任何指示吗?
val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))`
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
try {
rdd.foreachPartition { iter =>
iter.foreach {
case (key, msg) =>
val obj = msgParseMaster(msg)
val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))
}
}
}
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
我得到
org.apache.spark.SparkException: Task not serializable
下面是完整的日志
16/08/06 10:24:52 ERROR JobScheduler: Error running job streaming job 1470504292000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at
您不能在传递给 foreachPartition
的函数中调用 sc.parallelize
- 该函数必须序列化并发送给每个执行程序,并且 SparkContext
(有意)不可序列化(它应该只驻留在驱动程序应用程序中,而不是执行程序中)。
SparkContext
不可序列化,您不能在 foreachRDD
中使用它,并且从使用您的图表中您不需要它。相反,您可以简单地映射每个 RDD,解析出相关数据并将新的 RDD 保存到 cassandra:
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}
.foreachRDD(rdd => if (!rdd.isEmpty)
rdd.saveToCassandra("testKS",
"testTable",
SomeColumns("id", "data")))