状态函数内部的迭代器为 Empty
Iterator inside state function coming as Empty
我有这样的案例 类。我正在使用 RateStreamSource 生成测试数据。它给了我一个数据集。现在我对数据集 groupByKey
进行分组并调用 mapGroupsWithState
.
但是在状态函数中 updateRateAnother
有一些逻辑,我正在打印迭代器。迭代器在方法中总是以 Empty 出现,我的逻辑不起作用。
以下是代码的最小可重现示例
case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)
object ResourceConfigConsolidator {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("TestJob")
.getOrCreate()
import sparkSession.implicits._
val rate = 2
val randoms = List(10, 20, 30, 40, 50, 60, 70)
def randomElement = Random.shuffle(randoms).head
val rcConfigDS = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 40 == 0).map {
r => Rate2(r.timestamp, r.value, randomElement)
}
def updateRateAnother(key: Int, values: Iterator[Rate2], state:
GroupState[Employee]): Option[Employee] = {
println("key is here ::" + key)
if (state.hasTimedOut) {
// We've timed out, lets extract the state and send it down the stream
state.remove()
state.getOption
} else {
println("the iterating values ::::" + values.toList.mkString(" , \n"))
println("hello length ::::" + values.length)
if (!state.exists) {
if (values.length == 0) {
None
} else {
val latestValue = values.toList.maxBy(_.value)
val employee = Employee(latestValue.value.toString, latestValue.value)
state.update(employee)
Some(employee)
}
} else {
if (values.isEmpty) {
val currentState = state.get
Some(currentState)
} else {
val latestValue = values.toList.maxBy(_.value)
val currentState = state.get
val updated = currentState.copy(latestValue.value.toString, latestValue.value)
state.update(currentState.copy(latestValue.value.toString, latestValue.value))
Some(updated)
}
}
}
}
val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp =>
emp)
res.writeStream.format("console")
.outputMode(OutputMode.Update())
.option("truncate", value = false)
.option("checkpointLocation", "checkpoint1")
.start()
}
}
因为我是用 age
分组的,迭代器中应该至少有一个对象。我这样说对吗?为什么迭代器是空的?
你确定它是空的吗打印的时候?因为这是唯一令人惊讶的事情。你只能通过一个迭代器一次,所以一旦你第一次做 values.toList
,它 就变成空的 。您应该将 toList
的结果分配给一个变量,并丢弃迭代器。
更好的是,改变你的逻辑,这样你只需要一次传递,并摆脱 toList
(你可以直接在 Iterator
上调用 maxBy
...但只需要一次).这个想法是在处理大型数据集时不要将所有数据一次加载到内存中。
我有这样的案例 类。我正在使用 RateStreamSource 生成测试数据。它给了我一个数据集。现在我对数据集 groupByKey
进行分组并调用 mapGroupsWithState
.
但是在状态函数中 updateRateAnother
有一些逻辑,我正在打印迭代器。迭代器在方法中总是以 Empty 出现,我的逻辑不起作用。
以下是代码的最小可重现示例
case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)
object ResourceConfigConsolidator {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("TestJob")
.getOrCreate()
import sparkSession.implicits._
val rate = 2
val randoms = List(10, 20, 30, 40, 50, 60, 70)
def randomElement = Random.shuffle(randoms).head
val rcConfigDS = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 40 == 0).map {
r => Rate2(r.timestamp, r.value, randomElement)
}
def updateRateAnother(key: Int, values: Iterator[Rate2], state:
GroupState[Employee]): Option[Employee] = {
println("key is here ::" + key)
if (state.hasTimedOut) {
// We've timed out, lets extract the state and send it down the stream
state.remove()
state.getOption
} else {
println("the iterating values ::::" + values.toList.mkString(" , \n"))
println("hello length ::::" + values.length)
if (!state.exists) {
if (values.length == 0) {
None
} else {
val latestValue = values.toList.maxBy(_.value)
val employee = Employee(latestValue.value.toString, latestValue.value)
state.update(employee)
Some(employee)
}
} else {
if (values.isEmpty) {
val currentState = state.get
Some(currentState)
} else {
val latestValue = values.toList.maxBy(_.value)
val currentState = state.get
val updated = currentState.copy(latestValue.value.toString, latestValue.value)
state.update(currentState.copy(latestValue.value.toString, latestValue.value))
Some(updated)
}
}
}
}
val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp =>
emp)
res.writeStream.format("console")
.outputMode(OutputMode.Update())
.option("truncate", value = false)
.option("checkpointLocation", "checkpoint1")
.start()
}
}
因为我是用 age
分组的,迭代器中应该至少有一个对象。我这样说对吗?为什么迭代器是空的?
你确定它是空的吗打印的时候?因为这是唯一令人惊讶的事情。你只能通过一个迭代器一次,所以一旦你第一次做 values.toList
,它 就变成空的 。您应该将 toList
的结果分配给一个变量,并丢弃迭代器。
更好的是,改变你的逻辑,这样你只需要一次传递,并摆脱 toList
(你可以直接在 Iterator
上调用 maxBy
...但只需要一次).这个想法是在处理大型数据集时不要将所有数据一次加载到内存中。