spark -- Kafka streaming Exception -- 对象不是 serializableConsumerRecord

spark -- Kafka streaming Exception -- object not serializableConsumerRecord

我是 运行 kafka 流 reader on spark

以下是依赖关系

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
</dependencies>

当一些数据说 'Hi----3' 被生成到 kafka 主题时,得到以下异常(我可以看到异常中的数据)--

Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3"))

我没有对 rdd 进行任何计算(因为它也抛出了同样的异常)。甚至 stream.print() 也抛出异常

代码如下

import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.Subscribe
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.rdd.RDD

class Metrics {

  def readKafka() {
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    val topics = Array("q_metrics")
    val sc = new SparkContext("local[4]", "ScalaKafkaConsumer")
    val streamingContext = new StreamingContext(sc, Seconds(10))

    val stream = KafkaUtils.createDirectStream[String, String](
                            streamingContext,
                            PreferConsistent,
                            Subscribe[String, String](topics, kafkaParams))

    stream.print()


    streamingContext.start

    streamingContext.awaitTermination()

  }

  def rddReader(rdd: Array[String]) = {

  }
}

object MetricsReader {
  def main(args: Array[String]): Unit = {
    val objMetrics = new Metrics()
    objMetrics.readKafka()
  }
}

感谢任何帮助。

谢谢

发现问题,我们无法直接打印,因为 'print' 调用了 ConsumerRecord。所以我用地图来获取记录,收集键值然后打印

stream.foreachRDD { rdd => 
                        val collected = rdd.map(record => ( record.key(), record.value() )).collect()
                        for ( c <- collected )  {
                           println(c)
                        }
                  }