使用 Spark Streaming 读取 Kafka 记录时出现 Not Serializable 异常
Not Serializable exception when reading Kafka records with Spark Streaming
当使用 Spark 2.0 从 Kafka 流式传输时,出现以下错误:
org.apache.spark.SparkException:
Job aborted due to stage failure:
Task 0.0 in stage 1.0 (TID 1) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class:
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337,
CreateTime = 1472871209063, checksum = 2826679694,
serialized key size = -1, serialized value size = 95874,
key = null, value = <JSON GOES HERE...>
以下是代码的相关部分:
val ssc = new StreamingContext(sc, Seconds(2))
val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
.map(_.value())
.flatMap(message => {
// parsing here...
})
.foreachRDD(rdd => {
// processing here...
})
ssc.start()
据我所知,是这一行导致了问题.map(_.value())
,如何解决?
您不能像在那里使用的那样在 Dstream:[String,String] 上使用 .map。我认为您可以使用变换,然后按如下方式应用地图
val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}
或者您可以像以前一样使用 .map 而不是 _.value() 您应该尝试将函数发送到地图中,就像我在下面所做的那样
stream.map{case (x, y) => (y.toString)}
当使用 Spark 2.0 从 Kafka 流式传输时,出现以下错误:
org.apache.spark.SparkException:
Job aborted due to stage failure:
Task 0.0 in stage 1.0 (TID 1) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class:
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337,
CreateTime = 1472871209063, checksum = 2826679694,
serialized key size = -1, serialized value size = 95874,
key = null, value = <JSON GOES HERE...>
以下是代码的相关部分:
val ssc = new StreamingContext(sc, Seconds(2))
val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
.map(_.value())
.flatMap(message => {
// parsing here...
})
.foreachRDD(rdd => {
// processing here...
})
ssc.start()
据我所知,是这一行导致了问题.map(_.value())
,如何解决?
您不能像在那里使用的那样在 Dstream:[String,String] 上使用 .map。我认为您可以使用变换,然后按如下方式应用地图
val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}
或者您可以像以前一样使用 .map 而不是 _.value() 您应该尝试将函数发送到地图中,就像我在下面所做的那样
stream.map{case (x, y) => (y.toString)}