状态函数内部的迭代器为 Empty

Iterator inside state function coming as Empty

我有这样的案例 类。我正在使用 RateStreamSource 生成测试数据。它给了我一个数据集。现在我对数据集 groupByKey 进行分组并调用 mapGroupsWithState.

但是在状态函数中 updateRateAnother 有一些逻辑,我正在打印迭代器。迭代器在方法中总是以 Empty 出现,我的逻辑不起作用。

以下是代码的最小可重现示例

case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)

object ResourceConfigConsolidator {

   def main(args: Array[String]): Unit = {

       val sparkSession = SparkSession
                         .builder()
                         .appName("TestJob")
                         .getOrCreate()

      import sparkSession.implicits._
      val rate = 2
      val randoms = List(10, 20, 30, 40, 50, 60, 70)
      def randomElement = Random.shuffle(randoms).head
      val rcConfigDS = sparkSession
                        .readStream
                        .format("rate") // <-- use RateStreamSource
                        .option("rowsPerSecond", rate)
                        .load()
                        .as[Rate].filter(_.value % 40 == 0).map {
                          r => Rate2(r.timestamp, r.value, randomElement)
                        }


 def updateRateAnother(key: Int, values: Iterator[Rate2], state: 
                              GroupState[Employee]): Option[Employee] = {
     println("key is here ::" + key)
     if (state.hasTimedOut) {
    // We've timed out, lets extract the state and send it down the stream
    state.remove()
    state.getOption
  } else {
    println("the iterating values ::::" + values.toList.mkString(" , \n"))
    println("hello length ::::" + values.length)
    if (!state.exists) {
      if (values.length == 0) {
        None
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val employee = Employee(latestValue.value.toString, latestValue.value)
        state.update(employee)
        Some(employee)
      }
    } else {
      if (values.isEmpty) {
        val currentState = state.get
        Some(currentState)
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val currentState = state.get
        val updated = currentState.copy(latestValue.value.toString, latestValue.value)
        state.update(currentState.copy(latestValue.value.toString, latestValue.value))
        Some(updated)
       }
     }
    }
   }

  val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
  mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp => 
  emp)

  res.writeStream.format("console")
  .outputMode(OutputMode.Update())
  .option("truncate", value = false)
  .option("checkpointLocation", "checkpoint1")
  .start()

 }
}

因为我是用 age 分组的,迭代器中应该至少有一个对象。我这样说对吗?为什么迭代器是空的?

你确定它是空的吗打印的时候?因为这是唯一令人惊讶的事情。你只能通过一个迭代器一次,所以一旦你第一次做 values.toList,它 就变成空的 。您应该将 toList 的结果分配给一个变量,并丢弃迭代器。

更好的是,改变你的逻辑,这样你只需要一次传递,并摆脱 toList(你可以直接在 Iterator 上调用 maxBy ...但只需要一次).这个想法是在处理大型数据集时不要将所有数据一次加载到内存中。