通用状态管理

Generic state management

此问题是 的后续问题。


我想封装状态管理逻辑。

以下代表我现在所在的位置:

class StateManager(
  stream: DStream[(String, String)],
  updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
  lazy val myState = stream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec.function(updateStateFunction)
}

object StateManager {
  def apply(
    _dStream: DStream[(String, String)],
    _updateState: (String, Option[String], State[String]) => Option[(String, String)]
  ) =
    new StateManager(dStream, updateState)
}

这很好用,但只允许处理 DStream[(String,String)],这是通向通用状态管理的第一步,适合欢迎任何 DStream:从 DStream[(Int,String)]DStream[(String,myCustomClass)].

myState 需要是一个值函数才能工作 (serialization)。

但我遇到了一个问题,因为 type parameters 不适用于 scala 中的函数对象。

user6910411 通过使用带有封闭方法 () 的 ClassTags 给了我一个提示,但反过来它仍然是一个方法。

有人知道如何克服这些困难吗?


上下文:

Spark 1.6

火花图:

object Consumer_Orchestrator {
    def main(args: Array[String]) = {
        //setup configurations

        val streamingContext = StreamingEnvironment(/*configurations*/)

        val kafkaStream = streamingContext.stream()

        val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
        val initialState = emptyRDD

        val stateManager = StateManager(kafkaStream, updateState)
        val state: DStream[(String, String)] = stateManager.myState

        state.foreachRDD(_.foreach(println))

        myStreamingContext.start()
        myStreamingContext.awaitTermination()
    }
}

StreamingEnvironment class 创建 Streaming:

class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
    val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
    lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))

    mStreamingContext.checkpoint(/*directory checkpoint*/)
    mStreamingContext.remember(Minutes(1))

    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
    def stop() = sparkContext.stop()
}

object StreamingEnvironment {
    def apply(kafkaConf: KafkaConf) = {
    val sparkConf = new SparkConf

    new StreamingEnvironment(sparkConf, kafkaConf)
    }
}

你在这里:

  • App.scala:

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.dstream.ConstantInputDStream
    import statemanager._
    
    object App {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext("local[*]", "generic", new SparkConf())
        val ssc = new StreamingContext(sc, Seconds(10))
        ssc.checkpoint("/tmp/chk")
    
        StateManager(
          new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))),
          (_: String, _: Option[Int], _: State[Int]) =>  Option(1)
        ).myState.print
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
  • StateManage.scala:

    package statemanager
    
    import scala.reflect.ClassTag
    import org.apache.spark.streaming.{State, StateSpec}
    import org.apache.spark.streaming.dstream.DStream
    
    class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
      stream: DStream[(T, U)],
      updateStateFunction: (T, Option[U], State[V]) => Option[W]
    ) {
      lazy val myState = stream.mapWithState(stateSpec).map(_.get)
      lazy val stateSpec = StateSpec.function(updateStateFunction)
    }
    
    object StateManager {
      def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
        _dStream: DStream[(T, U)],
        _updateState: (T, Option[U], State[V]) => Option[W]
      ) =
        new StateManager(_dStream, _updateState)
    }
    
  • build.sbt:

    scalaVersion := "2.11.8"
    
    val sparkVersion = "2.1.0"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion
    )
    
  • 目录结构:

    ├── App.scala
    ├── build.sbt
    └── StateManage.scala
    
  • 执行示例:

    sbt run
    ...
    -------------------------------------------
     Time: 1483701790000 ms
     -------------------------------------------
    1
    1
    ...
    

如您所见,这里没有魔法。如果您引入通用参数,您需要在相同的上下文中 ClassTags