使用 Spark DStream 作为 Akka 流源的惯用方式

Idiomatic way to use Spark DStream as Source for an Akka stream

我正在构建一个 REST API,它在 Spark 集群中开始一些计算并以分块的结果流进行响应。鉴于具有计算结果的 Spark 流,我可以使用

dstream.foreachRDD()

将数据发送出 Spark。我正在使用 akka-http:

发送分块的 HTTP 响应
val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

为简单起见,我首先尝试让纯文本工作,稍后会添加 JSON 编组。

但是使用 Spark DStream 作为 Akka 流的源的惯用方式是什么?我认为我应该能够通过套接字来完成它,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正。

编辑:此答案仅适用于旧版本的 spark 和 akka。 PH88的回答是最近版本的正确方法

您可以使用中间体 akka.actor.Actor 来提供来源(类似于 ). The solution below is not "reactive" because the underlying Actor would need to maintain a buffer of RDD messages that may be dropped if the downstream http client isn't consuming chunks quickly enough. But this problem occurs regardless of the implementation details since you cannot connect the "throttling" of the akka stream back-pressure to the DStream in order to slow down the data. This is due to the fact that DStream does not implement org.reactivestreams.Publisher .

基本拓扑结构为:

DStream --> Actor with buffer --> Source

要构建此拓扑,您必须创建一个类似于实现的 Actor here :

//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props 

创建一个基于JobManager 的ByteStrings(消息)流Source。此外,将 ByteString 转换为 HttpEntity.ChunkStreamPart,这是 HttpResponse 所要求的:

import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString

type Message = ByteString

val messageToChunkPart = 
  Flow[Message].map(HttpEntity.ChunkStreamPart(_))

//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
  Source(ActorPublisher[Message](actorRef)) via messageToChunkPart

Link Spark DStream 到 Actor 以便每个传入的 RDD 都转换为 ByteString 的 Iterable,然后转发给 Actor:

import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD

val dstream : DStream = ???

//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???

def sendMessageToActor(message : Message) = actorRef ! message

//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}

提供 HttpResponse 的来源:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

注意:dstream foreachRDD 行和 HttpReponse 之间应该只有很少的 time/code,因为在 [=20] 之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息=]行被执行。

在提问时不确定 api 的版本。但是现在,有了 akka-stream 2.0.3,我相信你可以这样做:

val source = Source
  .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
  .mapMaterializedValue[Unit] { actorRef =>
    dstream.foreach(actorRef ! _)
  }