通用状态管理
Generic state management
此问题是 的后续问题。
我想封装状态管理逻辑。
以下代表我现在所在的位置:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}
这很好用,但只允许处理 DStream[(String,String)]
,这是通向通用状态管理的第一步,适合欢迎任何 DStream
:从 DStream[(Int,String)]
到 DStream[(String,myCustomClass)]
.
myState
需要是一个值函数才能工作 (serialization
)。
但我遇到了一个问题,因为 type parameter
s 不适用于 scala 中的函数对象。
user6910411 通过使用带有封闭方法 () 的 ClassTag
s 给了我一个提示,但反过来它仍然是一个方法。
有人知道如何克服这些困难吗?
上下文:
Spark 1.6
火花图:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}
StreamingEnvironment
class 创建 Streaming
:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}
你在这里:
App.scala
:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ConstantInputDStream
import statemanager._
object App {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[*]", "generic", new SparkConf())
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/chk")
StateManager(
new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))),
(_: String, _: Option[Int], _: State[Int]) => Option(1)
).myState.print
ssc.start()
ssc.awaitTermination()
}
}
StateManage.scala
:
package statemanager
import scala.reflect.ClassTag
import org.apache.spark.streaming.{State, StateSpec}
import org.apache.spark.streaming.dstream.DStream
class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
stream: DStream[(T, U)],
updateStateFunction: (T, Option[U], State[V]) => Option[W]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
_dStream: DStream[(T, U)],
_updateState: (T, Option[U], State[V]) => Option[W]
) =
new StateManager(_dStream, _updateState)
}
build.sbt
:
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
目录结构:
├── App.scala
├── build.sbt
└── StateManage.scala
执行示例:
sbt run
...
-------------------------------------------
Time: 1483701790000 ms
-------------------------------------------
1
1
...
如您所见,这里没有魔法。如果您引入通用参数,您需要在相同的上下文中 ClassTags
。
此问题是
我想封装状态管理逻辑。
以下代表我现在所在的位置:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}
这很好用,但只允许处理 DStream[(String,String)]
,这是通向通用状态管理的第一步,适合欢迎任何 DStream
:从 DStream[(Int,String)]
到 DStream[(String,myCustomClass)]
.
myState
需要是一个值函数才能工作 (serialization
)。
但我遇到了一个问题,因为 type parameter
s 不适用于 scala 中的函数对象。
user6910411 通过使用带有封闭方法 (ClassTag
s 给了我一个提示,但反过来它仍然是一个方法。
有人知道如何克服这些困难吗?
上下文:
Spark 1.6
火花图:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}
StreamingEnvironment
class 创建 Streaming
:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}
你在这里:
App.scala
:import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.ConstantInputDStream import statemanager._ object App { def main(args: Array[String]): Unit = { val sc = new SparkContext("local[*]", "generic", new SparkConf()) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/chk") StateManager( new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))), (_: String, _: Option[Int], _: State[Int]) => Option(1) ).myState.print ssc.start() ssc.awaitTermination() } }
StateManage.scala
:package statemanager import scala.reflect.ClassTag import org.apache.spark.streaming.{State, StateSpec} import org.apache.spark.streaming.dstream.DStream class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag]( stream: DStream[(T, U)], updateStateFunction: (T, Option[U], State[V]) => Option[W] ) { lazy val myState = stream.mapWithState(stateSpec).map(_.get) lazy val stateSpec = StateSpec.function(updateStateFunction) } object StateManager { def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag]( _dStream: DStream[(T, U)], _updateState: (T, Option[U], State[V]) => Option[W] ) = new StateManager(_dStream, _updateState) }
build.sbt
:scalaVersion := "2.11.8" val sparkVersion = "2.1.0" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-streaming" % sparkVersion )
目录结构:
├── App.scala ├── build.sbt └── StateManage.scala
执行示例:
sbt run ... ------------------------------------------- Time: 1483701790000 ms ------------------------------------------- 1 1 ...
如您所见,这里没有魔法。如果您引入通用参数,您需要在相同的上下文中 ClassTags
。