处理不同的状态

Handle different states

我想知道是否可以在应用程序中保持完全不同的状态?例如,让第一个状态的 update function 调用第二个状态的那个?

我不记得经历过任何这样的例子,我也没有找到任何相反的指示......根据 https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html 的例子,我不知道为什么我不能拥有不同的 trackStateFunc 具有不同的 State,并且由于它们的 Key 仍然更新它们,如下所示:

def firstTrackStateFunc(batchTime: Time, 
                        key: String, 
                        value: Option[Int], 
                        state: State[Long]): Option[(String, Long)] = {
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val output = (key, sum)
    state.update(sum)
    Some(output)
}

def secondTrackStateFunc(batchTime: Time, 
                         key: String, 
                         value: Option[Int], 
                         state: State[Int]): Option[(String, Long)] = {
    // disregard problems this example would cause
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, dif)
    state.update(dif)
    Some(output)
}

我认为这是可能的,但仍然不确定。我希望有人验证或推翻这个假设...

I was wondering if it was possible to maintain radically different states across an application?

DStream[(Key, Value)] 上对 mapWithState 的每次调用都可以包含一个 State[T] 对象。对于 mapWithState 的每次调用,此 T 都必须相同。为了使用不同的状态,您可以链接 mapWithState 调用,其中一个 Option[U] 是另一个输入,或者您可以拆分 DStream 并应用不同的 mapWithState 调用每一个。但是,您不能在另一个内部调用不同的 State[T] 对象,因为它们彼此隔离,并且一个不能改变另一个的状态。

@Yuval 对链式 mapWithState 函数给出了很好的答案。但是,我有另一种方法。您可以将 sum 和 diff 放在同一个 State[(Int, Int)].

中,而不是调用两个 mapWithState

在这种情况下,您只需要一个 mapWithState 函数,您可以在其中更新这两个东西。像这样:

def trackStateFunc(batchTime: Time, 
                   key: String, 
                   value: Option[Int], 
                   state: State[(Long, Int)]): Option[(String, (Long, Int))] =
{
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
    val output = (key, (sum, diff))
    state.update((sum, diff))
    Some(output)
}