无法通过火花流消费卡夫卡消息
unable to consume kafka message through spark stream
我正在尝试通过 spark 流程序使用来自 kafka 生产者的消息。
这是我的程序
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// lines.print()
lines.foreachRDD(rdd=>{
rdd.foreach(message=>
println(message))
})
以上程序运行成功。但是我看不到任何消息被打印出来。
使用 "local[*]"
设置您的主人 url
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
您也可以尝试调用 collect() 并查看是否收到消息。
lines.foreachRDD { rdd =>
rdd.collect().foreach(println)
}
我正在尝试通过 spark 流程序使用来自 kafka 生产者的消息。
这是我的程序
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// lines.print()
lines.foreachRDD(rdd=>{
rdd.foreach(message=>
println(message))
})
以上程序运行成功。但是我看不到任何消息被打印出来。
使用 "local[*]"
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
您也可以尝试调用 collect() 并查看是否收到消息。
lines.foreachRDD { rdd =>
rdd.collect().foreach(println)
}