Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

我正在尝试将 spark streaming 偏移量读入我的消费者,但我似乎无法正确地做到这一点。

这是我的代码。

val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
  val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
  fromOffsets += topicAndPartition
}

val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here 
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

directKafkaStream.foreachRDD(rdd1 => { ..

这是显示数据帧的输出

partition_number|topic_name|current_offset|
+----------------+----------+--------------+
|               0|TOPIC_NAME|          4421|

非常感谢任何帮助。

我正在使用 spark 1.6、Scala 2.10.5、kafka 10

如官方文档所示KafkaUtils.createDirectStream,您应该将fromOffsets作为createDirectStream的第3个参数传递(并且不要忘记第4个参数messageHandler)。

fromOffsets参数假设是一个collection.immutable.Map[TopicAndPartition, Long],我们通常在Scala中尽可能使用不可变的而不是可变的。
您可以使用以下内容将 dfoffsetArray 转换为 immutable.Map[TopicAndPartition, Long]

val fromOffsets = dfoffsetArray.map( i =>
  TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap

messageHandler(MessageAndMetadata[K, V]) ⇒ R)的类型,处理消息的key和value。您可以定义一个简单的处理程序,如下所示:

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

然后你的 createDirectStream 看起来像...

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
  (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

现在您可以自由地对流进行一些转换。直播愉快!


几个月前 this 文章对我进行了辅导。也许你会发现它有帮助。