在 Spark 流中计算平均值不起作用:问题 w/updateStateByKey 和实例化 class
calculating average in Spark streaming not working : issue w/ updateStateByKey and instantiating class
我想在流式上下文中获取通过 Linux 命令的 nc 输入的股票代码和值,并且 return 平均每分钟左右。
值以 (String, Double) 格式输入,即 20 美元。
我的设计是,我想获取这些值(每行 1 个字符串和值)并用它们实例化一个 "Visit" class。然后使用这个class(以及"VisitState" class来计算)用于我的updateStateByKey流方法。
这是我试过的:
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines:ReceiverInputDStream = ssc.socketTextStream(args(0), args(1).toInt)
val words:RDD[String] = lines.flatMap(_.split(" "))
val wordDstream :DStream[(String, Double)] = words.map(x => (x(1).toString, x(2).toDouble))
// Update function that will compute state after each dstream ingestion
val update = (vals: Seq[Double], state: Option[VisitState]) => {
val prev = state.getOrElse(VisitState())
val currentCount = prev.count + vals.size
val newAvg = (prev.average * prev.count + vals.sum)/currentCount
Some(new VisitState(currentCount, newAvg))
}
val state:DStream[(String, VisitState)] = wordDstream.updateStateByKey[VisitState](update)
这是我的 classes:
case class Visit(label: String, marketValue: Int)
extends Serializable
{
private var id : String = ""
private var value : Int = 0
this.id = label.toString
this.value = marketValue
override def toString():String = {label+s", value=$marketValue"}
}
和
case class VisitState(count: Double = 0L, average: Double = 0L) {
}
我有 2 个问题:
编译错误:值updateStateByKey不是org.apache.spark.streaming.dstream.DStream[访问]的成员
[错误] val state:DStream[(String, VisitState)] = wordDstream.updateStateByKeyVisit
我真的不知道如何用 Scala 中的值实例化我的 class。我不知道如何。
有什么帮助吗?
干杯,
马特
对于第一个错误,我不会使用 flatMap
而只是 map
:
val words = lines.map(line => line.split(" "))
val wordsTuple = words.map(line => (line(0).toString, line(1).toDouble)
对于第二点,你可以这样做
val visits = wordsTuple.map(line => (line._1, new Visit(line._1, line._2)))
def update(values: Seq[Visit], state: Option[VisitState]) {
...
}
val visitStates = visits.updateStateByKey[VisitState](update)
我想在流式上下文中获取通过 Linux 命令的 nc 输入的股票代码和值,并且 return 平均每分钟左右。
值以 (String, Double) 格式输入,即 20 美元。 我的设计是,我想获取这些值(每行 1 个字符串和值)并用它们实例化一个 "Visit" class。然后使用这个class(以及"VisitState" class来计算)用于我的updateStateByKey流方法。
这是我试过的:
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines:ReceiverInputDStream = ssc.socketTextStream(args(0), args(1).toInt)
val words:RDD[String] = lines.flatMap(_.split(" "))
val wordDstream :DStream[(String, Double)] = words.map(x => (x(1).toString, x(2).toDouble))
// Update function that will compute state after each dstream ingestion
val update = (vals: Seq[Double], state: Option[VisitState]) => {
val prev = state.getOrElse(VisitState())
val currentCount = prev.count + vals.size
val newAvg = (prev.average * prev.count + vals.sum)/currentCount
Some(new VisitState(currentCount, newAvg))
}
val state:DStream[(String, VisitState)] = wordDstream.updateStateByKey[VisitState](update)
这是我的 classes:
case class Visit(label: String, marketValue: Int)
extends Serializable
{
private var id : String = ""
private var value : Int = 0
this.id = label.toString
this.value = marketValue
override def toString():String = {label+s", value=$marketValue"}
}
和
case class VisitState(count: Double = 0L, average: Double = 0L) {
}
我有 2 个问题:
编译错误:值updateStateByKey不是org.apache.spark.streaming.dstream.DStream[访问]的成员 [错误] val state:DStream[(String, VisitState)] = wordDstream.updateStateByKeyVisit
我真的不知道如何用 Scala 中的值实例化我的 class。我不知道如何。
有什么帮助吗? 干杯, 马特
对于第一个错误,我不会使用 flatMap
而只是 map
:
val words = lines.map(line => line.split(" "))
val wordsTuple = words.map(line => (line(0).toString, line(1).toDouble)
对于第二点,你可以这样做
val visits = wordsTuple.map(line => (line._1, new Visit(line._1, line._2)))
def update(values: Seq[Visit], state: Option[VisitState]) {
...
}
val visitStates = visits.updateStateByKey[VisitState](update)