如何在 Spark Streaming 中将两个值作为状态传递?
How to pass two values as State in Spark Streaming?
我尝试实现一个从 Kafka 读取流数据的 Spark Streaming 应用程序。流数据是"String,int"形式的(key,value)对,我想计算每个key的平均值。
数据格式如下:
x,20
y,10
z,3
...
我想以有状态的方式测量每个键的平均值。因此,我打算将value的总和及其对应的key出现的次数保存到映射函数中的State
中。
def mappingFunc(key: String, value: Option[Double], state: State[Double], count: State[Int]): (String, Double) = {
val sum = value.getOrElse(0.0) + state.getOption.getOrElse(0.0)
val cnt = count.getOption.getOrElse(1) + 1
state.update(sum)
count.update(cnt)
val output = (key, sum/cnt)
output
}
它提醒我有一个错误:
[error] /Users/Rabbit/Desktop/KTH_Second_Year/Periods/P1/Data-intensive_Computing/Lab_Assignment/lab3/src/sparkstreaming/KafkaSpark.scala:78: wrong number of type parameters for overloaded method value function with alternatives:
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function3[KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function4[org.apache.spark.streaming.Time,KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],org.apache.spark.api.java.Optional[MappedType]])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: (KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => MappedType)org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: (org.apache.spark.streaming.Time, KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => Option[MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType]
如何在 Spark Streaming 中同时传递值的总和和计数?
您需要将总和和计数合并为存储在状态中的元组 (Double, Int)
。以下代码段应该可以解决问题:
def mappingFunc(key: String, value: Option[Double], state: State[(Double, Int)]): (String, Double) = {
val (sum, cnt) = state.getOption.getOrElse((0.0, 0))
val newSum = value.getOrElse(0.0) + sum
val newCnt = cnt + 1
state.update((newSum, newCnt))
(key, newSum/newCnt)
}
我尝试实现一个从 Kafka 读取流数据的 Spark Streaming 应用程序。流数据是"String,int"形式的(key,value)对,我想计算每个key的平均值。 数据格式如下:
x,20
y,10
z,3
...
我想以有状态的方式测量每个键的平均值。因此,我打算将value的总和及其对应的key出现的次数保存到映射函数中的State
中。
def mappingFunc(key: String, value: Option[Double], state: State[Double], count: State[Int]): (String, Double) = {
val sum = value.getOrElse(0.0) + state.getOption.getOrElse(0.0)
val cnt = count.getOption.getOrElse(1) + 1
state.update(sum)
count.update(cnt)
val output = (key, sum/cnt)
output
}
它提醒我有一个错误:
[error] /Users/Rabbit/Desktop/KTH_Second_Year/Periods/P1/Data-intensive_Computing/Lab_Assignment/lab3/src/sparkstreaming/KafkaSpark.scala:78: wrong number of type parameters for overloaded method value function with alternatives:
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function3[KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: org.apache.spark.api.java.function.Function4[org.apache.spark.streaming.Time,KeyType,org.apache.spark.api.java.Optional[ValueType],org.apache.spark.streaming.State[StateType],org.apache.spark.api.java.Optional[MappedType]])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: (KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => MappedType)org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType] <and>
[error] [KeyType, ValueType, StateType, MappedType](mappingFunction: (org.apache.spark.streaming.Time, KeyType, Option[ValueType], org.apache.spark.streaming.State[StateType]) => Option[MappedType])org.apache.spark.streaming.StateSpec[KeyType,ValueType,StateType,MappedType]
如何在 Spark Streaming 中同时传递值的总和和计数?
您需要将总和和计数合并为存储在状态中的元组 (Double, Int)
。以下代码段应该可以解决问题:
def mappingFunc(key: String, value: Option[Double], state: State[(Double, Int)]): (String, Double) = {
val (sum, cnt) = state.getOption.getOrElse((0.0, 0))
val newSum = value.getOrElse(0.0) + sum
val newCnt = cnt + 1
state.update((newSum, newCnt))
(key, newSum/newCnt)
}