json 解析器的 Apache Spark 对象不可序列化异常
Apache Spark Object not Serializable Exception for json parser
我正在从 kafka 队列中读取数据[json as String] 并尝试使用 liftweb json [=21] 将 json as String 解析为 case class =].
这里是代码片段
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParam: Map[String, String] = Map(
"bootstrap.servers" -> kafkaServer,
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"zookeeper.connect" -> zookeeperUrl,
"group.id" -> "demo-group")
import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)
streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
(id, parse(tweet).extract[Tweet])
}.print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
我遇到了这个异常
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: net.liftweb.json.DefaultFormats$
Serialization stack:
- object not serializable (class: net.liftweb.json.DefaultFormats$, value: net.liftweb.json.DefaultFormats$@74a2fec)
- field (class: Tweet, name: formats, type: interface net.liftweb.json.Formats)
- object (class Tweet, Tweet(Akash24,Adele))
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,Tweet(Akash24,Adele)))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 11)
谁能帮我解决这个问题
任何帮助将不胜感激
谢谢
从日志来看,它看起来像是 Class 不可序列化的简单异常。更正是使用以下代码:
sparkConf.registerKryoClasses(Array(classOf[DefaultFormats]))
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParam: Map[String, String] = Map(
"bootstrap.servers" -> kafkaServer,
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"zookeeper.connect" -> zookeeperUrl,
"group.id" -> "demo-group")
import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)
streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
(id, parse(tweet).extract[Tweet])
}.print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
它将使 DefaultFormats
class 可序列化并且 Spark master 将能够发送 implicit val formats
到所有工作节点。
我正在从 kafka 队列中读取数据[json as String] 并尝试使用 liftweb json [=21] 将 json as String 解析为 case class =].
这里是代码片段
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParam: Map[String, String] = Map(
"bootstrap.servers" -> kafkaServer,
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"zookeeper.connect" -> zookeeperUrl,
"group.id" -> "demo-group")
import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)
streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
(id, parse(tweet).extract[Tweet])
}.print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
我遇到了这个异常
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: net.liftweb.json.DefaultFormats$
Serialization stack:
- object not serializable (class: net.liftweb.json.DefaultFormats$, value: net.liftweb.json.DefaultFormats$@74a2fec)
- field (class: Tweet, name: formats, type: interface net.liftweb.json.Formats)
- object (class Tweet, Tweet(Akash24,Adele))
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,Tweet(Akash24,Adele)))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 11)
谁能帮我解决这个问题 任何帮助将不胜感激 谢谢
从日志来看,它看起来像是 Class 不可序列化的简单异常。更正是使用以下代码:
sparkConf.registerKryoClasses(Array(classOf[DefaultFormats]))
val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParam: Map[String, String] = Map(
"bootstrap.servers" -> kafkaServer,
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"zookeeper.connect" -> zookeeperUrl,
"group.id" -> "demo-group")
import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)
streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
(id, parse(tweet).extract[Tweet])
}.print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
它将使 DefaultFormats
class 可序列化并且 Spark master 将能够发送 implicit val formats
到所有工作节点。