spark -- Kafka streaming Exception -- 对象不是 serializableConsumerRecord
spark -- Kafka streaming Exception -- object not serializableConsumerRecord
我是 运行 kafka 流 reader on spark
以下是依赖关系
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
当一些数据说 'Hi----3' 被生成到 kafka 主题时,得到以下异常(我可以看到异常中的数据)--
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3"))
我没有对 rdd 进行任何计算(因为它也抛出了同样的异常)。甚至 stream.print() 也抛出异常
代码如下
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.Subscribe
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.rdd.RDD
class Metrics {
def readKafka() {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("q_metrics")
val sc = new SparkContext("local[4]", "ScalaKafkaConsumer")
val streamingContext = new StreamingContext(sc, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
stream.print()
streamingContext.start
streamingContext.awaitTermination()
}
def rddReader(rdd: Array[String]) = {
}
}
object MetricsReader {
def main(args: Array[String]): Unit = {
val objMetrics = new Metrics()
objMetrics.readKafka()
}
}
感谢任何帮助。
谢谢
发现问题,我们无法直接打印,因为 'print' 调用了 ConsumerRecord。所以我用地图来获取记录,收集键值然后打印
stream.foreachRDD { rdd =>
val collected = rdd.map(record => ( record.key(), record.value() )).collect()
for ( c <- collected ) {
println(c)
}
}
我是 运行 kafka 流 reader on spark
以下是依赖关系
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
当一些数据说 'Hi----3' 被生成到 kafka 主题时,得到以下异常(我可以看到异常中的数据)--
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3"))
我没有对 rdd 进行任何计算(因为它也抛出了同样的异常)。甚至 stream.print() 也抛出异常
代码如下
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.Subscribe
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.rdd.RDD
class Metrics {
def readKafka() {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("q_metrics")
val sc = new SparkContext("local[4]", "ScalaKafkaConsumer")
val streamingContext = new StreamingContext(sc, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
stream.print()
streamingContext.start
streamingContext.awaitTermination()
}
def rddReader(rdd: Array[String]) = {
}
}
object MetricsReader {
def main(args: Array[String]): Unit = {
val objMetrics = new Metrics()
objMetrics.readKafka()
}
}
感谢任何帮助。
谢谢
发现问题,我们无法直接打印,因为 'print' 调用了 ConsumerRecord。所以我用地图来获取记录,收集键值然后打印
stream.foreachRDD { rdd =>
val collected = rdd.map(record => ( record.key(), record.value() )).collect()
for ( c <- collected ) {
println(c)
}
}