mapWithState().timeout() 触发一些功能

mapWithState().timeout() to trigger some function

我想使用 mapWithState().timeout() 函数作为触发其他函数的事件。有办法吗?

我已经阅读了一些 statesnapshot() 来查找 timeout 日志消息,但没有找到足够的信息。那会是我问题的某种答案的开始吗?在哪里可以找到更多相关信息?

I would like to use the mapWithState().timeout() function as an event to trigger some other function. Is there a way to do so

是的,这是可能的。您可以做的是定义一个发生超时后要调用的操作。您如何知道 MapWithStateRDD 何时发生超时?你可以看看State[S].isTimingOut()方法。一旦超时到期,此方法将产生 true,并且 mapWithState 将最后一次执行 StateSpec 方法,其中 value 设置为 None:

object Foo {
  def main(args: Array[String]): Unit = {
    val spec = StateSpec.function(updateState _).timeout(Milliseconds(5))
    // Use spec to invoke `mapWithState`
  }

  def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = {
    value match {
      case Some(number) => Some(number + 1)
      case _ if state.isTimingOut() => // Trigger Code Here
  }
}