在 Spark 流中计算平均值不起作用:问题 w/updateStateByKey 和实例化 class

calculating average in Spark streaming not working : issue w/ updateStateByKey and instantiating class

我想在流式上下文中获取通过 Linux 命令的 nc 输入的股票代码和值,并且 return 平均每分钟左右。

值以 (String, Double) 格式输入,即 20 美元。 我的设计是,我想获取这些值(每行 1 个字符串和值)并用它们实例化一个 "Visit" class。然后使用这个class(以及"VisitState" class来计算)用于我的updateStateByKey流方法。

这是我试过的:

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val  lines:ReceiverInputDStream = ssc.socketTextStream(args(0), args(1).toInt)

    val words:RDD[String] = lines.flatMap(_.split(" "))  
    val wordDstream :DStream[(String, Double)] = words.map(x => (x(1).toString, x(2).toDouble))

    // Update function that will compute state after each dstream ingestion
    val update = (vals: Seq[Double], state: Option[VisitState]) => {
       val prev = state.getOrElse(VisitState())
       val currentCount = prev.count + vals.size
       val newAvg = (prev.average * prev.count + vals.sum)/currentCount
      Some(new VisitState(currentCount, newAvg))

    }
  val state:DStream[(String, VisitState)] = wordDstream.updateStateByKey[VisitState](update)

这是我的 classes:

case class Visit(label: String, marketValue: Int)
  extends Serializable 
{
  private var id : String = ""
  private var value : Int = 0
  this.id = label.toString
  this.value = marketValue

  override def toString():String = {label+s", value=$marketValue"} 
}

case class VisitState(count: Double = 0L, average: Double = 0L) {

}

我有 2 个问题:

有什么帮助吗? 干杯, 马特

对于第一个错误,我不会使用 flatMap 而只是 map:

val words = lines.map(line => line.split(" "))
val wordsTuple = words.map(line => (line(0).toString, line(1).toDouble)

对于第二点,你可以这样做

val visits = wordsTuple.map(line => (line._1, new Visit(line._1, line._2)))
def update(values: Seq[Visit], state: Option[VisitState]) {
     ...
}
val visitStates = visits.updateStateByKey[VisitState](update)