从 RDD 访问 KafkaOffset 时出现异常
Exception while accessing KafkaOffset from RDD
我有一个从 Kafka 流式传输的 Spark 消费者。
我正在尝试管理恰好一次语义的偏移量。
但是,在访问偏移量时抛出以下异常:
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD
cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"
执行此操作的代码部分如下:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
这里的 dataStream 是使用 KafkaUtils API 创建的直接流 (DStream[String]) 类似于 :
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
如果有人能帮助我理解我在这里做错了什么。
transform 是官方文档中提到的对数据流执行的方法链中的第一个方法
谢谢。
您的问题是:
.map(._2)
创建 MapPartitionedDStream
而不是 KafkaUtils.createKafkaStream
创建的 DirectKafkaInputDStream
。
您需要 map
在 transform
之后:
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))
kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.map(_._2)
.foreachRDD(rdd => // stuff)
我有一个从 Kafka 流式传输的 Spark 消费者。 我正在尝试管理恰好一次语义的偏移量。
但是,在访问偏移量时抛出以下异常:
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"
执行此操作的代码部分如下:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
这里的 dataStream 是使用 KafkaUtils API 创建的直接流 (DStream[String]) 类似于 :
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
如果有人能帮助我理解我在这里做错了什么。 transform 是官方文档中提到的对数据流执行的方法链中的第一个方法
谢谢。
您的问题是:
.map(._2)
创建 MapPartitionedDStream
而不是 KafkaUtils.createKafkaStream
创建的 DirectKafkaInputDStream
。
您需要 map
在 transform
之后:
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))
kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.map(_._2)
.foreachRDD(rdd => // stuff)