reduceByKey 在火花流中不起作用

reduceByKey doesn't work in spark streaming

我有以下代码片段,其中 reduceByKey 似乎不起作用。

val myKafkaMessageStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topicsSet, kafkaParams)
)

myKafkaMessageStream
  .foreachRDD { rdd => 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    val myIter = rdd.mapPartitionsWithIndex { (i, iter) =>
      val offset = offsetRanges(i)
      iter.map(item => {
        (offset.fromOffset, offset.untilOffset, offset.topic, offset.partition, item)
      })
    }

    val myRDD = myIter.filter( (<filter_condition>) ).map(row => {
      //Process row

      ((field1, field2, field3) , (field4, field5))
    })

    val result = myRDD.reduceByKey((a,b) => (a._1+b._1, a._2+b._2))

    result.foreachPartition { partitionOfRecords =>
      //I don't get the reduced result here
      val connection = createNewConnection()
      partitionOfRecords.foreach(record => connection.send(record))
      connection.close()
    }        
  }

我是不是漏掉了什么?

在流式传输的情况下,使用 reduceByKeyAndWindow 对我来说更有意义,它可以满足您的需求,但要在特定的时间范围内。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

"When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks."

http://spark.apache.org/docs/latest/streaming-programming-guide.html