mapWithState().timeout() 触发一些功能
mapWithState().timeout() to trigger some function
我想使用 mapWithState().timeout()
函数作为触发其他函数的事件。有办法吗?
我已经阅读了一些 statesnapshot()
来查找 timeout
日志消息,但没有找到足够的信息。那会是我问题的某种答案的开始吗?在哪里可以找到更多相关信息?
I would like to use the mapWithState().timeout()
function as an event
to trigger some other function. Is there a way to do so
是的,这是可能的。您可以做的是定义一个发生超时后要调用的操作。您如何知道 MapWithStateRDD
何时发生超时?你可以看看State[S].isTimingOut()
方法。一旦超时到期,此方法将产生 true
,并且 mapWithState
将最后一次执行 StateSpec
方法,其中 value
设置为 None
:
object Foo {
def main(args: Array[String]): Unit = {
val spec = StateSpec.function(updateState _).timeout(Milliseconds(5))
// Use spec to invoke `mapWithState`
}
def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = {
value match {
case Some(number) => Some(number + 1)
case _ if state.isTimingOut() => // Trigger Code Here
}
}
我想使用 mapWithState().timeout()
函数作为触发其他函数的事件。有办法吗?
我已经阅读了一些 statesnapshot()
来查找 timeout
日志消息,但没有找到足够的信息。那会是我问题的某种答案的开始吗?在哪里可以找到更多相关信息?
I would like to use the
mapWithState().timeout()
function as an event to trigger some other function. Is there a way to do so
是的,这是可能的。您可以做的是定义一个发生超时后要调用的操作。您如何知道 MapWithStateRDD
何时发生超时?你可以看看State[S].isTimingOut()
方法。一旦超时到期,此方法将产生 true
,并且 mapWithState
将最后一次执行 StateSpec
方法,其中 value
设置为 None
:
object Foo {
def main(args: Array[String]): Unit = {
val spec = StateSpec.function(updateState _).timeout(Milliseconds(5))
// Use spec to invoke `mapWithState`
}
def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = {
value match {
case Some(number) => Some(number + 1)
case _ if state.isTimingOut() => // Trigger Code Here
}
}