如何在 Spark Streaming 中为窗口操作执行 combineByKey()

How to perform combineByKey() for windowed operations in Spark Streaming

我正在寻找 combineByKeyAndWindow() 之类的东西,但是找不到。有没有办法使用 combineByKey()foreachRDD 来模仿它的功能?

已编辑:

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <topics>
    |  <brokers> is a list of one or more Kafka brokers
    |  <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
  System.exit(1)
}

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

val lines = messages.map(_._2)

var counter = 0
lines.window(Seconds(10),Seconds(5)).foreachRDD { (rdd : RDD[String], time : Time) =>

    val wc = rdd.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_+_)
    wc.coalesce(1).saveAsTextFile("file:///home/vdep/output/temp"+counter)
    counter = counter + 1
} 

ssc.checkpoint("/home/vdep/kafkaOutput/kafkachkpt/") 
ssc.start()
ssc.awaitTermination()
}
}

以上是一个使用 .window() & .foreachRDD() 的简单的 wordCount 程序。我在每次迭代(RDD)时将结果保存到一个文件中。但是只有 temp0 保存在输出文件夹中。

lines 是 Kafka DirectStream。

但码字如期无.window().

您可能会通过以下操作获得相同的结果:

dstream.window(...).transform(rdd=> rdd.combineByKey(...))

尽管您会错过 reducers 对 window 中数据大小提供的优化。