从 RDD 访问 KafkaOffset 时出现异常

Exception while accessing KafkaOffset from RDD

我有一个从 Kafka 流式传输的 Spark 消费者。 我正在尝试管理恰好一次语义的偏移量。

但是,在访问偏移量时抛出以下异常:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"

执行此操作的代码部分如下:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

这里的 dataStream 是使用 KafkaUtils API 创建的直接流 (DStream[String]) 类似于 :

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人能帮助我理解我在这里做错了什么。 transform 是官方文档中提到的对数据流执行的方法链中的第一个方法

谢谢。

您的问题是:

.map(._2)

创建 MapPartitionedDStream 而不是 KafkaUtils.createKafkaStream 创建的 DirectKafkaInputDStream

您需要 maptransform 之后:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
  .transform { 
    rdd => 
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
  .map(_._2)
  .foreachRDD(rdd => // stuff)