将 Akka Source 转换为 Spark InputDStream 的惯用方法

Idiomatic way to turn an Akka Source into a Spark InputDStream

我实际上是在尝试做与 中要求的相反的事情;也就是说,使用 Source[A] 将元素推入 InputDStream[A].

到目前为止,我已经设法将一个使用类似于 ActorWordCount example 的 Feeder actor 和 Receiver actor 的实现组合在一起,但这似乎有点复杂,所以我好奇有没有更简单的方法。

参考:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

你可以这样做:

class StreamStopped extends RuntimeException("Stream stopped")

// Serializable factory class
case class SourceFactory(start: Int, end: Int) {
  def source = Source(start to end).map(_.toString)
}

class CustomReceiver(sourceFactory: SourceFactory)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  implicit val materializer = ....

  def onStart() {
    sourceFactory.source.runForEach { e =>
      if (isStopped) {
        // Stop the source
        throw new StreamStopped
      } else {
        store(e)
      }
    } onFailure {
      case _: StreamStopped => // ignore
      case ex: Throwable => reportError("Source exception", ex)
    }
  }

  def onStop() {}
}

val customReceiverStream = ssc.receiverStream(new CustomReceiver(SourceFactory(1,100))

编辑:5 天后自我接受,因为没有好的答案。

我已经将基于 Actor 的实现提取到一个库中,Sparkka-streams,到目前为止它一直在为我工作。当出现这个问题的更好解决方案时,我将更新或弃用该库。

其用法如下:

// InputDStream can then be used to build elements of the graph that require integration with Spark
val (inputDStream, feedDInput) = Streaming.connection[Int]()
val source = Source.fromGraph(GraphDSL.create() { implicit builder =>

  import GraphDSL.Implicits._

  val source = Source(1 to 10)

  val bCast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val add1 = Flow[Int].map(_ + 1)
  val times3 = Flow[Int].map(_ * 3)
  source ~> bCast ~> add1 ~> merge
            bCast ~> times3 ~> feedDInput ~> merge

  SourceShape(merge.out)
})

val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
whenReady(reducedFlow)(_ shouldBe 230)

val sharedVar = ssc.sparkContext.accumulator(0)
inputDStream.foreachRDD { rdd =>
  rdd.foreach { i =>
    sharedVar += i
  }
}
ssc.start()
eventually(sharedVar.value shouldBe 165)