如何在 Spark Streaming 中为窗口操作执行 combineByKey()
How to perform combineByKey() for windowed operations in Spark Streaming
我正在寻找 combineByKeyAndWindow()
之类的东西,但是找不到。有没有办法使用 combineByKey()
和 foreachRDD
来模仿它的功能?
已编辑:
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
var counter = 0
lines.window(Seconds(10),Seconds(5)).foreachRDD { (rdd : RDD[String], time : Time) =>
val wc = rdd.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_+_)
wc.coalesce(1).saveAsTextFile("file:///home/vdep/output/temp"+counter)
counter = counter + 1
}
ssc.checkpoint("/home/vdep/kafkaOutput/kafkachkpt/")
ssc.start()
ssc.awaitTermination()
}
}
以上是一个使用 .window()
& .foreachRDD()
的简单的 wordCount 程序。我在每次迭代(RDD)时将结果保存到一个文件中。但是只有 temp0 保存在输出文件夹中。
lines
是 Kafka DirectStream。
但码字如期无.window()
.
您可能会通过以下操作获得相同的结果:
dstream.window(...).transform(rdd=> rdd.combineByKey(...))
尽管您会错过 reducers 对 window 中数据大小提供的优化。
我正在寻找 combineByKeyAndWindow()
之类的东西,但是找不到。有没有办法使用 combineByKey()
和 foreachRDD
来模仿它的功能?
已编辑:
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
var counter = 0
lines.window(Seconds(10),Seconds(5)).foreachRDD { (rdd : RDD[String], time : Time) =>
val wc = rdd.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_+_)
wc.coalesce(1).saveAsTextFile("file:///home/vdep/output/temp"+counter)
counter = counter + 1
}
ssc.checkpoint("/home/vdep/kafkaOutput/kafkachkpt/")
ssc.start()
ssc.awaitTermination()
}
}
以上是一个使用 .window()
& .foreachRDD()
的简单的 wordCount 程序。我在每次迭代(RDD)时将结果保存到一个文件中。但是只有 temp0 保存在输出文件夹中。
lines
是 Kafka DirectStream。
但码字如期无.window()
.
您可能会通过以下操作获得相同的结果:
dstream.window(...).transform(rdd=> rdd.combineByKey(...))
尽管您会错过 reducers 对 window 中数据大小提供的优化。