处理不同的状态
Handle different states
我想知道是否可以在应用程序中保持完全不同的状态?例如,让第一个状态的 update function
调用第二个状态的那个?
我不记得经历过任何这样的例子,我也没有找到任何相反的指示......根据 https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html 的例子,我不知道为什么我不能拥有不同的 trackStateFunc
具有不同的 State
,并且由于它们的 Key
仍然更新它们,如下所示:
def firstTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
state.update(sum)
Some(output)
}
和
def secondTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Int]): Option[(String, Long)] = {
// disregard problems this example would cause
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, dif)
state.update(dif)
Some(output)
}
我认为这是可能的,但仍然不确定。我希望有人验证或推翻这个假设...
I was wondering if it was possible to maintain radically different
states across an application?
在 DStream[(Key, Value)]
上对 mapWithState
的每次调用都可以包含一个 State[T]
对象。对于 mapWithState
的每次调用,此 T
都必须相同。为了使用不同的状态,您可以链接 mapWithState
调用,其中一个 Option[U]
是另一个输入,或者您可以拆分 DStream
并应用不同的 mapWithState
调用每一个。但是,您不能在另一个内部调用不同的 State[T]
对象,因为它们彼此隔离,并且一个不能改变另一个的状态。
@Yuval 对链式 mapWithState 函数给出了很好的答案。但是,我有另一种方法。您可以将 sum 和 diff 放在同一个 State[(Int, Int)].
中,而不是调用两个 mapWithState
在这种情况下,您只需要一个 mapWithState 函数,您可以在其中更新这两个东西。像这样:
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[(Long, Int)]): Option[(String, (Long, Int))] =
{
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, (sum, diff))
state.update((sum, diff))
Some(output)
}
我想知道是否可以在应用程序中保持完全不同的状态?例如,让第一个状态的 update function
调用第二个状态的那个?
我不记得经历过任何这样的例子,我也没有找到任何相反的指示......根据 https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html 的例子,我不知道为什么我不能拥有不同的 trackStateFunc
具有不同的 State
,并且由于它们的 Key
仍然更新它们,如下所示:
def firstTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
state.update(sum)
Some(output)
}
和
def secondTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Int]): Option[(String, Long)] = {
// disregard problems this example would cause
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, dif)
state.update(dif)
Some(output)
}
我认为这是可能的,但仍然不确定。我希望有人验证或推翻这个假设...
I was wondering if it was possible to maintain radically different states across an application?
在 DStream[(Key, Value)]
上对 mapWithState
的每次调用都可以包含一个 State[T]
对象。对于 mapWithState
的每次调用,此 T
都必须相同。为了使用不同的状态,您可以链接 mapWithState
调用,其中一个 Option[U]
是另一个输入,或者您可以拆分 DStream
并应用不同的 mapWithState
调用每一个。但是,您不能在另一个内部调用不同的 State[T]
对象,因为它们彼此隔离,并且一个不能改变另一个的状态。
@Yuval 对链式 mapWithState 函数给出了很好的答案。但是,我有另一种方法。您可以将 sum 和 diff 放在同一个 State[(Int, Int)].
中,而不是调用两个 mapWithState在这种情况下,您只需要一个 mapWithState 函数,您可以在其中更新这两个东西。像这样:
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[(Long, Int)]): Option[(String, (Long, Int))] =
{
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, (sum, diff))
state.update((sum, diff))
Some(output)
}